You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/03/28 06:00:10 UTC
svn commit: r1461952 [3/6] - in /hadoop/common/branches/branch-1: ./ bin/
src/docs/src/documentation/content/xdocs/ src/test/
src/test/org/apache/hadoop/tools/distcp2/
src/test/org/apache/hadoop/tools/distcp2/mapred/
src/test/org/apache/hadoop/tools/di...
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptionSwitch;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCopyMapper {
+ private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
+ private static List<Path> pathList = new ArrayList<Path>();
+ private static int nFiles = 0;
+ private static final int FILE_SIZE = 1024;
+
+ private static MiniDFSCluster cluster;
+
+ private static final String SOURCE_PATH = "/tmp/source";
+ private static final String TARGET_PATH = "/tmp/target";
+
+ private static Configuration configuration;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ configuration = getConfigurationForCluster();
+ cluster = new MiniDFSCluster(configuration, 1, true, null);
+ }
+
+ private static Configuration getConfigurationForCluster() throws IOException {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static Configuration getConfiguration() throws IOException {
+ Configuration configuration = getConfigurationForCluster();
+ final FileSystem fs = cluster.getFileSystem();
+ Path workPath = new Path(TARGET_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath.toString());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ workPath.toString());
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ false);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
+ true);
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ "br");
+ return configuration;
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs(SOURCE_PATH + "/1");
+ mkdirs(SOURCE_PATH + "/2");
+ mkdirs(SOURCE_PATH + "/2/3/4");
+ mkdirs(SOURCE_PATH + "/2/3");
+ mkdirs(SOURCE_PATH + "/5");
+ touchFile(SOURCE_PATH + "/5/6");
+ mkdirs(SOURCE_PATH + "/7");
+ mkdirs(SOURCE_PATH + "/7/8");
+ touchFile(SOURCE_PATH + "/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+ fileSystem.getWorkingDirectory());
+ pathList.add(qualifiedPath);
+ fileSystem.mkdirs(qualifiedPath);
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fs;
+ DataOutputStream outputStream = null;
+ try {
+ fs = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ final long blockSize = fs.getDefaultBlockSize() * 2;
+ outputStream = fs.create(qualifiedPath, true, 0,
+ (short)(fs.getDefaultReplication()*2),
+ blockSize);
+ outputStream.write(new byte[FILE_SIZE]);
+ pathList.add(qualifiedPath);
+ ++nFiles;
+
+ FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
+ System.out.println(fileStatus.getBlockSize());
+ System.out.println(fileStatus.getReplication());
+ }
+ finally {
+ IOUtils.cleanup(null, outputStream);
+ }
+ }
+
+ @Test
+ public void testRun() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+ copyMapper.setup(context);
+
+ for (Path path: pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ // Check that the maps worked.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+ Assert.assertEquals(fs.getFileStatus(path).getReplication(),
+ fs.getFileStatus(targetPath).getReplication());
+ Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
+ fs.getFileStatus(targetPath).getBlockSize());
+ Assert.assertTrue(!fs.isFile(targetPath) ||
+ fs.getFileChecksum(targetPath).equals(
+ fs.getFileChecksum(path)));
+ }
+
+ Assert.assertEquals(pathList.size(),
+ stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
+ Assert.assertEquals(nFiles * FILE_SIZE,
+ stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+ testCopyingExistingFiles(fs, copyMapper, context);
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Unexpected exception: ", e);
+ Assert.assertTrue(false);
+ }
+ }
+
+ private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+ Mapper<Text, FileStatus, Text, Text>.Context context) {
+
+ try {
+ for (Path path : pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ Assert.assertEquals(nFiles,
+ context.getCounter(CopyMapper.Counter.SKIP).getValue());
+ }
+ catch (Exception exception) {
+ Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
+ false);
+ }
+ }
+
+ @Test
+ public void testMakeDirFailure() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath);
+ copyMapper.setup(context);
+
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
+ fs.getFileStatus(pathList.get(0)), context);
+
+ Assert.assertTrue("There should have been an exception.", false);
+ }
+ catch (Exception ignore) {
+ }
+ }
+
+ @Test
+ public void testIgnoreFailures() {
+ doTestIgnoreFailures(true);
+ doTestIgnoreFailures(false);
+ }
+
+ @Test
+ public void testDirToFile() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ mkdirs(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Expected copy to fail");
+ } catch (AccessControlException e) {
+ Assert.assertTrue("Got exception: " + e.getMessage(), true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCopyReadableFiles() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSkipCopyNoPerms() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.assertEquals(stubContext.getWriter().values().size(), 1);
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
+ contains(SOURCE_PATH + "/src/file"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailCopyWithAccessControlException() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
+ out.write("hello world".getBytes());
+ out.close();
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Didn't expect the file to be copied");
+ } catch (AccessControlException ignore) {
+ } catch (Exception e) {
+ if (e.getCause() == null || !(e.getCause() instanceof AccessControlException)) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFileToDir() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ private void doTestIgnoreFailures(boolean ignoreFailures) {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ configuration.setBoolean(
+ DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ if (!fileStatus.isDir()) {
+ fs.delete(path, true);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+ }
+ if (ignoreFailures) {
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
+ }
+ }
+ Assert.assertTrue("There should have been an exception.", ignoreFailures);
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(),
+ !ignoreFailures);
+ e.printStackTrace();
+ }
+ }
+
+ private static void deleteState() throws IOException {
+ pathList.clear();
+ nFiles = 0;
+ cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
+ cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+ }
+
+ @Test
+ public void testPreserveBlockSizeAndReplication() {
+ testPreserveBlockSizeAndReplicationImpl(true);
+ testPreserveBlockSizeAndReplicationImpl(false);
+ }
+
+ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
+ fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the block-size/replication aren't preserved.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDir() ) {
+ Assert.assertTrue(preserve ||
+ source.getBlockSize() != target.getBlockSize());
+ Assert.assertTrue(preserve ||
+ source.getReplication() != target.getReplication());
+ Assert.assertTrue(!preserve ||
+ source.getBlockSize() == target.getBlockSize());
+ Assert.assertTrue(!preserve ||
+ source.getReplication() == target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+
+ private static void changeUserGroup(String user, String group)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ FsPermission changedPermission = new FsPermission(
+ FsAction.ALL, FsAction.ALL, FsAction.ALL
+ );
+ for (Path path : pathList)
+ if (fs.isFile(path)) {
+ fs.setOwner(path, user, group);
+ fs.setPermission(path, changedPermission);
+ }
+ }
+
+ /**
+ * If a single file is being copied to a location where the file (of the same
+ * name) already exists, then the file shouldn't be skipped.
+ */
+ @Test
+ public void testSingleFileCopy() {
+ try {
+ deleteState();
+ touchFile(SOURCE_PATH + "/1");
+ Path sourceFilePath = pathList.get(0);
+ Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
+ SOURCE_PATH, TARGET_PATH));
+ touchFile(targetFilePath.toString());
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.getParent().toString()); // Parent directory.
+ copyMapper.setup(context);
+
+ final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+
+ long before = fs.getFileStatus(targetFilePath).getModificationTime();
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ long after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been skipped", before == after);
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.toString()); // Specify the file path.
+ copyMapper.setup(context);
+
+ before = fs.getFileStatus(targetFilePath).getModificationTime();
+ try { Thread.sleep(2); } catch (Throwable ignore) {}
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been overwritten.", before < after);
+
+ } catch (Exception exception) {
+ Assert.fail("Unexpected exception: " + exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testPreserveUserGroup() {
+ testPreserveUserGroupImpl(true);
+ testPreserveUserGroupImpl(false);
+ }
+
+ private void testPreserveUserGroupImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+ changeUserGroup("Michael", "Corleone");
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.USER);
+ fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
+ fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
+ }
+
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the user/group attributes are preserved
+ // (only) as necessary.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDir()) {
+ Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue(source.isDir() ||
+ source.getReplication() != target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCopyOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestCopyOutputFormat.class);
+
+ @Test
+ public void testSetCommitDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setCommitDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getCommitDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Commit Directory");
+ }
+ }
+
+ @Test
+ public void testSetWorkingDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setWorkingDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getWorkingDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Working Directory");
+ }
+ }
+
+ @Test
+ public void testGetOutputCommitter() {
+ try {
+ TaskAttemptContext context = new TaskAttemptContext(new Configuration(),
+ new TaskAttemptID("200707121733", 1, true, 1, 1));
+ context.getConfiguration().set("mapred.output.dir", "/out");
+ Assert.assertTrue(new CopyOutputFormat().getOutputCommitter(context) instanceof CopyCommitter);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Unable to get output committer");
+ }
+ }
+
+ @Test
+ public void testCheckOutputSpecs() {
+ try {
+ OutputFormat outputFormat = new CopyOutputFormat();
+ Job job = Job.getInstance(new Configuration());
+ JobID jobID = new JobID("200707121733", 1);
+
+ try {
+ JobContext context = new JobContext(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work/commit path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ try {
+ JobContext context = new JobContext(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid commit path");
+ } catch (IllegalStateException ignore) { }
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContext(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContext(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ } catch (IllegalStateException ignore) {
+ Assert.fail("Output spec check failed.");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ } catch (InterruptedException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestUniformSizeInputFormat {
+ private static final Log LOG
+ = LogFactory.getLog(TestUniformSizeInputFormat.class);
+
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE=1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
+ distCpOptions.setMaxMaps(nMaps);
+ return distCpOptions;
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpOptions options = getOptions(nMaps);
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_1/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).
+ buildListing(listFile, options);
+
+ JobContext jobContext = new JobContext(configuration, new JobID());
+ UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();
+ List<InputSplit> splits
+ = uniformSizeInputFormat.getSplits(jobContext);
+
+ List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
+
+ int sizePerMap = totalFileSize/nMaps;
+
+ checkSplits(listFile, splits);
+ checkAgainstLegacy(splits, legacySplits);
+
+ int doubleCheckedTotalSize = 0;
+ int previousSplitSize = -1;
+ for (int i=0; i<splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
+ split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ while (recordReader.nextKeyValue()) {
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ Assert.assertEquals(fileStatus.length, 1);
+ currentSplitSize += fileStatus[0].getLen();
+ }
+ Assert.assertTrue(
+ previousSplitSize == -1
+ || Math.abs(currentSplitSize - previousSplitSize) < 0.1*sizePerMap
+ || i == splits.size()-1);
+
+ doubleCheckedTotalSize += currentSplitSize;
+ }
+
+ Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+ }
+
+ // From
+ // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
+ private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
+ throws IOException {
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus srcst = fs.getFileStatus(listFile);
+ Configuration conf = fs.getConf();
+
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ FileStatus value = new FileStatus();
+ Text key = new Text();
+ final long targetsize = totalFileSize / numSplits;
+ long pos = 0L;
+ long last = 0L;
+ long acc = 0L;
+ long cbrem = srcst.getLen();
+ SequenceFile.Reader sl = null;
+
+ LOG.info("Average bytes per map: " + targetsize +
+ ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
+
+ try {
+ sl = new SequenceFile.Reader(fs, listFile, conf);
+ for (; sl.next(key, value); last = sl.getPosition()) {
+ // if adding this split would put this split past the target size,
+ // cut the last split and put this next file in the next split.
+ if (acc + value.getLen() > targetsize && acc != 0) {
+ long splitsize = last - pos;
+ FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
+ splits.add(fileSplit);
+ cbrem -= splitsize;
+ pos = last;
+ acc = 0L;
+ }
+ acc += value.getLen();
+ }
+ }
+ finally {
+ IOUtils.closeStream(sl);
+ }
+ if (cbrem != 0) {
+ FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
+ splits.add(fileSplit);
+ }
+
+ return splits;
+ }
+
+ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
+ long lastEnd = 0;
+
+ //Verify if each split's start is matching with the previous end and
+ //we are not missing anything
+ for (InputSplit split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ long start = fileSplit.getStart();
+ Assert.assertEquals(lastEnd, start);
+ lastEnd = start + fileSplit.getLength();
+ }
+
+ //Verify there is nothing more to read from the input file
+ FileSystem fs = cluster.getFileSystem();
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(fs, listFile, fs.getConf());
+
+ try {
+ reader.seek(lastEnd);
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ private void checkAgainstLegacy(List<InputSplit> splits,
+ List<InputSplit> legacySplits)
+ throws IOException, InterruptedException {
+
+ Assert.assertEquals(legacySplits.size(), splits.size());
+ for (int index = 0; index < splits.size(); index++) {
+ FileSplit fileSplit = (FileSplit) splits.get(index);
+ FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
+ Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
+ }
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ testGetSplits(9);
+ for (int i=1; i<N_FILES; ++i)
+ testGetSplits(i);
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred.lib;
+
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDynamicInputFormat {
+ private static final Log LOG = LogFactory.getLog(TestDynamicInputFormat.class);
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 1000;
+ private static final int NUM_SPLITS = 7;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static List<String> expectedFilePaths = new ArrayList<String>(N_FILES);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster(getConfigurationForCluster(), 1, true, null);
+
+ for (int i=0; i<N_FILES; ++i)
+ createFile("/tmp/source/" + String.valueOf(i));
+
+ }
+
+ private static Configuration getConfigurationForCluster() {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data",
+ "target/tmp/build/TEST_DYNAMIC_INPUT_FORMAT/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static DistCpOptions getOptions() throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ DistCpOptions options = new DistCpOptions(sourceList, targetPath);
+ options.setMaxMaps(NUM_SPLITS);
+ return options;
+ }
+
+ private static void createFile(String path) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ expectedFilePaths.add(fileSystem.listStatus(
+ new Path(path))[0].getPath().toString());
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ DistCpOptions options = getOptions();
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
+ new Path(cluster.getFileSystem().getUri().toString()
+ +"/tmp/testDynInputFormat/fileList.seq"), options);
+
+ JobContext jobContext = new JobContext(configuration, new JobID());
+ DynamicInputFormat<Text, FileStatus> inputFormat =
+ new DynamicInputFormat<Text, FileStatus>();
+ List<InputSplit> splits = inputFormat.getSplits(jobContext);
+
+ int nFiles = 0;
+ int taskId = 0;
+
+ for (InputSplit split : splits) {
+ RecordReader<Text, FileStatus> recordReader =
+ inputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, taskId);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+
+ recordReader.initialize(splits.get(0), taskAttemptContext);
+ float previousProgressValue = 0f;
+ while (recordReader.nextKeyValue()) {
+ FileStatus fileStatus = recordReader.getCurrentValue();
+ String source = fileStatus.getPath().toString();
+ System.out.println(source);
+ Assert.assertTrue(expectedFilePaths.contains(source));
+ final float progress = recordReader.getProgress();
+ Assert.assertTrue(progress >= previousProgressValue);
+ Assert.assertTrue(progress >= 0.0f);
+ Assert.assertTrue(progress <= 1.0f);
+ previousProgressValue = progress;
+ ++nFiles;
+ }
+ Assert.assertTrue(recordReader.getProgress() == 1.0f);
+
+ ++taskId;
+ }
+
+ Assert.assertEquals(expectedFilePaths.size(), nFiles);
+ }
+
+ @Test
+ public void testGetSplitRatio() throws Exception {
+ Assert.assertEquals(1, DynamicInputFormat.getSplitRatio(1, 1000000000));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
+ Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDistCpUtils {
+ private static final Log LOG = LogFactory.getLog(TestDistCpUtils.class);
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster(config, 1, true, null);
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetRelativePathRoot() {
+ Path root = new Path("/tmp/abc");
+ Path child = new Path("/tmp/abc/xyz/file");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/xyz/file");
+
+ root = new Path("/");
+ child = new Path("/a");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/a");
+ }
+
+ @Test
+ public void testPackAttributes() {
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "");
+
+ attributes.add(FileAttribute.REPLICATION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "R");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("R"));
+
+ attributes.add(FileAttribute.BLOCKSIZE);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RB");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RB"));
+
+ attributes.add(FileAttribute.USER);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBU");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBU"));
+
+ attributes.add(FileAttribute.GROUP);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUG");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUG"));
+
+ attributes.add(FileAttribute.PERMISSION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUGP");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUGP"));
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ FileSystem fs = FileSystem.get(config);
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+
+
+ Path path = new Path("/tmp/abc");
+ Path src = new Path("/tmp/src");
+ fs.mkdirs(path);
+ fs.mkdirs(src);
+ FileStatus srcStatus = fs.getFileStatus(src);
+
+ FsPermission noPerm = new FsPermission((short) 0);
+ fs.setPermission(path, noPerm);
+ fs.setOwner(path, "nobody", "nobody");
+
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ FileStatus target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), noPerm);
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.PERMISSION);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.GROUP);
+ attributes.add(FileAttribute.USER);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
+ Assert.assertEquals(target.getGroup(), srcStatus.getGroup());
+
+ fs.delete(path, true);
+ fs.delete(src, true);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Preserve test failure");
+ }
+ }
+
+ private static Random rand = new Random();
+
+ public static String createTestSetup(FileSystem fs) throws IOException {
+ return createTestSetup("/tmp1", fs, FsPermission.getDefault());
+ }
+
+ public static String createTestSetup(FileSystem fs,
+ FsPermission perm) throws IOException {
+ return createTestSetup("/tmp1", fs, perm);
+ }
+
+ public static String createTestSetup(String baseDir,
+ FileSystem fs,
+ FsPermission perm) throws IOException {
+ String base = getBase(baseDir);
+ fs.mkdirs(new Path(base + "/newTest/hello/world1"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world2/newworld"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world3/oldworld"));
+ fs.setPermission(new Path(base + "/newTest"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world1"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2/newworld"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3/oldworld"), perm);
+ createFile(fs, base + "/newTest/1");
+ createFile(fs, base + "/newTest/hello/2");
+ createFile(fs, base + "/newTest/hello/world3/oldworld/3");
+ createFile(fs, base + "/newTest/hello/world2/4");
+ return base;
+ }
+
+ private static String getBase(String base) {
+ String location = String.valueOf(rand.nextLong());
+ return base + "/" + location;
+ }
+
+ public static void delete(FileSystem fs, String path) {
+ try {
+ if (fs != null) {
+ if (path != null) {
+ fs.delete(new Path(path), true);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception encountered ", e);
+ }
+ }
+
+ public static void createFile(FileSystem fs, String filePath) throws IOException {
+ OutputStream out = fs.create(new Path(filePath));
+ IOUtils.closeStream(out);
+ }
+
+ public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
+ throws IOException {
+ Path base = new Path(targetBase);
+
+ Stack<Path> stack = new Stack<Path>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) continue;
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) continue;
+
+ for (FileStatus status : fStatus) {
+ if (status.isDir()) {
+ stack.push(status.getPath());
+ }
+ Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
+ DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
+ }
+ }
+ return true;
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestRetriableCommand {
+
+ private static class MyRetriableCommand extends RetriableCommand {
+
+ private int succeedAfter;
+ private int retryCount = 0;
+
+ public MyRetriableCommand(int succeedAfter) {
+ super("MyRetriableCommand");
+ this.succeedAfter = succeedAfter;
+ }
+
+ public MyRetriableCommand(int succeedAfter, RetryPolicy retryPolicy) {
+ super("MyRetriableCommand", retryPolicy);
+ this.succeedAfter = succeedAfter;
+ }
+
+ @Override
+ protected Object doExecute(Object... arguments) throws Exception {
+ if (++retryCount < succeedAfter)
+ throw new Exception("Transient failure#" + retryCount);
+ return 0;
+ }
+ }
+
+ @Test
+ public void testRetriableCommand() {
+ try {
+ new MyRetriableCommand(5).execute(0);
+ Assert.assertTrue(false);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+
+ try {
+ new MyRetriableCommand(3).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+
+ try {
+ new MyRetriableCommand(5, RetryPolicies.
+ retryUpToMaximumCountWithFixedSleep(5, 0, TimeUnit.MILLISECONDS)).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+
+public class TestThrottledInputStream {
+ private static final Log LOG = LogFactory.getLog(TestThrottledInputStream.class);
+ private static final int BUFF_SIZE = 1024;
+
+ private enum CB {ONE_C, BUFFER, BUFF_OFFSET}
+
+ @Test
+ public void testRead() {
+ File tmpFile;
+ File outFile;
+ try {
+ tmpFile = createFile(1024);
+ outFile = createFile();
+
+ tmpFile.deleteOnExit();
+ outFile.deleteOnExit();
+
+ long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFFER);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFFER);
+*/
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFF_OFFSET);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFF_OFFSET);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFF_OFFSET);
+*/
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.ONE_C);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.ONE_C);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.ONE_C);
+*/
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ }
+ }
+
+ private long copyAndAssert(File tmpFile, File outFile,
+ long maxBandwidth, float factor,
+ int sleepTime, CB flag) throws IOException {
+ long bandwidth;
+ ThrottledInputStream in;
+ long maxBPS = (long) (maxBandwidth / factor);
+
+ if (maxBandwidth == 0) {
+ in = new ThrottledInputStream(new FileInputStream(tmpFile));
+ } else {
+ in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+ }
+ OutputStream out = new FileOutputStream(outFile);
+ try {
+ if (flag == CB.BUFFER) {
+ copyBytes(in, out, BUFF_SIZE);
+ } else if (flag == CB.BUFF_OFFSET){
+ copyBytesWithOffset(in, out, BUFF_SIZE);
+ } else {
+ copyByteByByte(in, out);
+ }
+
+ LOG.info(in);
+ bandwidth = in.getBytesPerSec();
+ Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+ Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
+ Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS);
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
+ return bandwidth;
+ }
+
+ private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf, 0, buffSize);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ bytesRead = in.read(buf);
+ }
+ }
+
+ private static void copyByteByByte(InputStream in, OutputStream out)
+ throws IOException {
+
+ int ch = in.read();
+ while (ch >= 0) {
+ out.write(ch);
+ ch = in.read();
+ }
+ }
+
+ private static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ bytesRead = in.read(buf);
+ }
+ }
+
+ private File createFile(long sizeInKB) throws IOException {
+ File tmpFile = createFile();
+ writeToFile(tmpFile, sizeInKB);
+ return tmpFile;
+ }
+
+ private File createFile() throws IOException {
+ return File.createTempFile("tmp", "dat");
+ }
+
+ private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
+ OutputStream out = new FileOutputStream(tmpFile);
+ try {
+ byte[] buffer = new byte [1024];
+ for (long index = 0; index < sizeInKB; index++) {
+ out.write(buffer);
+ }
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/sslConfig.xml?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/sslConfig.xml (added)
+++ hadoop/common/branches/branch-1/src/test/sslConfig.xml Thu Mar 28 05:00:09 2013
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+ <name>ssl.client.truststore.location</name>
+ <value>/path/to/truststore/keys/keystore.jks</value>
+ <description>Truststore to be used by clients like distcp. Must be
+ specified.
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.truststore.password</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.truststore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.location</name>
+ <value>/path/to/keystore/keys/keystore.jks</value>
+ <description>Keystore to be used by clients like distcp. Must be
+ specified.
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.password</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.keypassword</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+</property>
+
+</configuration>
Added: hadoop/common/branches/branch-1/src/tools/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/distcp-default.xml?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/distcp-default.xml (added)
+++ hadoop/common/branches/branch-1/src/tools/distcp-default.xml Thu Mar 28 05:00:09 2013
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly. Anything that need to be overwritten,
+ need to be done so, through -D switches or customized conf -->
+
+<configuration>
+
+ <property>
+ <name>distcp.dynamic.strategy.impl</name>
+ <value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
+ <description>Implementation of dynamic input format</description>
+ </property>
+
+ <property>
+ <name>distcp.static.strategy.impl</name>
+ <value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
+ <description>Implementation of static input format</description>
+ </property>
+
+ <property>
+ <name>mapred.job.map.memory.mb</name>
+ <value>1024</value>
+ </property>
+
+ <property>
+ <name>mapred.job.reduce.memory.mb</name>
+ <value>1024</value>
+ </property>
+
+ <property>
+ <name>mapred.reducer.new-api</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>mapreduce.reduce.class</name>
+ <value>org.apache.hadoop.mapreduce.Reducer</value>
+ </property>
+
+</configuration>
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+/**
+ * The CopyListing abstraction is responsible for how the list of
+ * sources and targets is constructed, for DistCp's copy function.
+ * The copy-listing should be a SequenceFile<Text, FileStatus>,
+ * located at the path specified to buildListing(),
+ * each entry being a pair of (Source relative path, source file status),
+ * all the paths being fully qualified.
+ */
+public abstract class CopyListing extends Configured {
+
+ private Credentials credentials;
+
+ /**
+ * Build listing function creates the input listing that distcp uses to
+ * perform the copy.
+ *
+ * The build listing is a sequence file that has relative path of a file in the key
+ * and the file status information of the source file in the value
+ *
+ * For instance if the source path is /tmp/data and the traversed path is
+ * /tmp/data/dir1/dir2/file1, then the sequence file would contain
+ *
+ * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+ *
+ * File would also contain directory entries. Meaning, if /tmp/data/dir1/dir2/file1
+ * is the only file under /tmp/data, the resulting sequence file would contain the
+ * following entries
+ *
+ * key: /dir1 and value: FileStatus(/tmp/data/dir1)
+ * key: /dir1/dir2 and value: FileStatus(/tmp/data/dir1/dir2)
+ * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+ *
+ * Cases requiring special handling:
+ * If source path is a file (/tmp/file1), contents of the file will be as follows
+ *
+ * TARGET DOES NOT EXIST: Key-"", Value-FileStatus(/tmp/file1)
+ * TARGET IS FILE : Key-"", Value-FileStatus(/tmp/file1)
+ * TARGET IS DIR : Key-"/file1", Value-FileStatus(/tmp/file1)
+ *
+ * @param pathToListFile - Output file where the listing would be stored
+ * @param options - Input options to distcp
+ * @throws IOException - Exception if any
+ */
+ public final void buildListing(Path pathToListFile,
+ DistCpOptions options) throws IOException {
+ validatePaths(options);
+ doBuildListing(pathToListFile, options);
+ Configuration config = getConf();
+
+ config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
+
+ checkForDuplicates(pathToListFile);
+ }
+
+ /**
+ * Validate input and output paths
+ *
+ * @param options - Input options
+ * @throws InvalidInputException: If inputs are invalid
+ * @throws IOException: any Exception with FS
+ */
+ protected abstract void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException;
+
+ /**
+ * The interface to be implemented by sub-classes, to create the source/target file listing.
+ * @param pathToListFile Path on HDFS where the listing file is written.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException: Thrown on failure to create the listing file.
+ */
+ protected abstract void doBuildListing(Path pathToListFile,
+ DistCpOptions options) throws IOException;
+
+ /**
+ * Return the total bytes that distCp should copy for the source paths
+ * This doesn't consider whether file is same should be skipped during copy
+ *
+ * @return total bytes to copy
+ */
+ protected abstract long getBytesToCopy();
+
+ /**
+ * Return the total number of paths to distcp, includes directories as well
+ * This doesn't consider whether file/dir is already present and should be skipped during copy
+ *
+ * @return Total number of paths to distcp
+ */
+ protected abstract long getNumberOfPaths();
+
+ /**
+ * Validate the final resulting path listing to see if there are any duplicate entries
+ *
+ * @param pathToListFile - path listing build by doBuildListing
+ * @throws IOException - Any issues while checking for duplicates and throws
+ * @throws DuplicateFileException - if there are duplicates
+ */
+ private void checkForDuplicates(Path pathToListFile)
+ throws DuplicateFileException, IOException {
+
+ Configuration config = getConf();
+ FileSystem fs = pathToListFile.getFileSystem(config);
+
+ Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, sortedList, config);
+ try {
+ Text lastKey = new Text("*"); //source relative path can never hold *
+ FileStatus lastFileStatus = new FileStatus();
+
+ Text currentKey = new Text();
+ while (reader.next(currentKey)) {
+ if (currentKey.equals(lastKey)) {
+ FileStatus currentFileStatus = new FileStatus();
+ reader.getCurrentValue(currentFileStatus);
+ throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+ currentFileStatus.getPath() + " would cause duplicates. Aborting");
+ }
+ reader.getCurrentValue(lastFileStatus);
+ lastKey.set(currentKey);
+ }
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ /**
+ * Protected constructor, to initialize configuration.
+ * @param configuration The input configuration,
+ * with which the source/target FileSystems may be accessed.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached.If null
+ * delegation token caching is skipped
+ */
+ protected CopyListing(Configuration configuration, Credentials credentials) {
+ setConf(configuration);
+ setCredentials(credentials);
+ }
+
+ /**
+ * set Credentials store, on which FS delegatin token will be cached
+ * @param credentials - Credentials object
+ */
+ protected void setCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ /**
+ * get credentials to update the delegation tokens for accessed FS objects
+ * @return Credentials object
+ */
+ protected Credentials getCredentials() {
+ return credentials;
+ }
+
+ /**
+ * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
+ * @param configuration The input configuration.
+ * @param credentials Credentials object on which the FS delegation tokens are cached
+ * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+ * @return An instance of the appropriate CopyListing implementation.
+ * @throws java.io.IOException - Exception if any
+ */
+ public static CopyListing getCopyListing(Configuration configuration,
+ Credentials credentials,
+ DistCpOptions options)
+ throws IOException {
+
+ String copyListingClassName = configuration.get(DistCpConstants.
+ CONF_LABEL_COPY_LISTING_CLASS, "");
+ Class<? extends CopyListing> copyListingClass;
+ try {
+ if (! copyListingClassName.isEmpty()) {
+ copyListingClass = configuration.getClass(DistCpConstants.
+ CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
+ CopyListing.class);
+ } else {
+ if (options.getSourceFileListing() == null) {
+ copyListingClass = GlobbedCopyListing.class;
+ } else {
+ copyListingClass = FileBasedCopyListing.class;
+ }
+ }
+ copyListingClassName = copyListingClass.getName();
+ Constructor<? extends CopyListing> constructor = copyListingClass.
+ getDeclaredConstructor(Configuration.class, Credentials.class);
+ return constructor.newInstance(configuration, credentials);
+ } catch (Exception e) {
+ throw new IOException("Unable to instantiate " + copyListingClassName, e);
+ }
+ }
+
+ static class DuplicateFileException extends RuntimeException {
+ public DuplicateFileException(String message) {
+ super(message);
+ }
+ }
+
+ static class InvalidInputException extends RuntimeException {
+ public InvalidInputException(String message) {
+ super(message);
+ }
+ }
+}