You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/03/28 06:00:10 UTC

svn commit: r1461952 [3/6] - in /hadoop/common/branches/branch-1: ./ bin/ src/docs/src/documentation/content/xdocs/ src/test/ src/test/org/apache/hadoop/tools/distcp2/ src/test/org/apache/hadoop/tools/distcp2/mapred/ src/test/org/apache/hadoop/tools/di...

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptionSwitch;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCopyMapper {
+  private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
+  private static List<Path> pathList = new ArrayList<Path>();
+  private static int nFiles = 0;
+  private static final int FILE_SIZE = 1024;
+
+  private static MiniDFSCluster cluster;
+
+  private static final String SOURCE_PATH = "/tmp/source";
+  private static final String TARGET_PATH = "/tmp/target";
+
+  private static Configuration configuration;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    configuration = getConfigurationForCluster();
+    cluster = new MiniDFSCluster(configuration, 1, true, null);
+  }
+
+  private static Configuration getConfigurationForCluster() throws IOException {
+    Configuration configuration = new Configuration();
+    System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+    configuration.set("hadoop.log.dir", "target/tmp");
+    LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
+    LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+    return configuration;
+  }
+
+  private static Configuration getConfiguration() throws IOException {
+    Configuration configuration = getConfigurationForCluster();
+    final FileSystem fs = cluster.getFileSystem();
+    Path workPath = new Path(TARGET_PATH)
+            .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+            workPath.toString());
+    configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+            workPath.toString());
+    configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+            false);
+    configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+            true);
+    configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
+            true);
+    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+            "br");
+    return configuration;
+  }
+
+  private static void createSourceData() throws Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6");
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9");
+  }
+
+  private static void mkdirs(String path) throws Exception {
+    FileSystem fileSystem = cluster.getFileSystem();
+    final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+                                              fileSystem.getWorkingDirectory());
+    pathList.add(qualifiedPath);
+    fileSystem.mkdirs(qualifiedPath);
+  }
+
+  private static void touchFile(String path) throws Exception {
+    FileSystem fs;
+    DataOutputStream outputStream = null;
+    try {
+      fs = cluster.getFileSystem();
+      final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+                                                      fs.getWorkingDirectory());
+      final long blockSize = fs.getDefaultBlockSize() * 2;
+      outputStream = fs.create(qualifiedPath, true, 0,
+              (short)(fs.getDefaultReplication()*2),
+              blockSize);
+      outputStream.write(new byte[FILE_SIZE]);
+      pathList.add(qualifiedPath);
+      ++nFiles;
+
+      FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
+      System.out.println(fileStatus.getBlockSize());
+      System.out.println(fileStatus.getReplication());
+    }
+    finally {
+      IOUtils.cleanup(null, outputStream);
+    }
+  }
+
+  @Test
+  public void testRun() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+      copyMapper.setup(context);
+
+      for (Path path: pathList) {
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fs.getFileStatus(path), context);
+      }
+
+      // Check that the maps worked.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        Assert.assertTrue(fs.exists(targetPath));
+        Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+        Assert.assertEquals(fs.getFileStatus(path).getReplication(),
+                fs.getFileStatus(targetPath).getReplication());
+        Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
+                fs.getFileStatus(targetPath).getBlockSize());
+        Assert.assertTrue(!fs.isFile(targetPath) ||
+                fs.getFileChecksum(targetPath).equals(
+                        fs.getFileChecksum(path)));
+      }
+
+      Assert.assertEquals(pathList.size(),
+              stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
+      Assert.assertEquals(nFiles * FILE_SIZE,
+              stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+      testCopyingExistingFiles(fs, copyMapper, context);
+      for (Text value : stubContext.getWriter().values()) {
+        Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
+      }
+    }
+    catch (Exception e) {
+      LOG.error("Unexpected exception: ", e);
+      Assert.assertTrue(false);
+    }
+  }
+
+  private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+                                        Mapper<Text, FileStatus, Text, Text>.Context context) {
+
+    try {
+      for (Path path : pathList) {
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fs.getFileStatus(path), context);
+      }
+
+      Assert.assertEquals(nFiles,
+              context.getCounter(CopyMapper.Counter.SKIP).getValue());
+    }
+    catch (Exception exception) {
+      Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
+              false);
+    }
+  }
+
+  @Test
+  public void testMakeDirFailure() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+      configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+              workPath);
+      copyMapper.setup(context);
+
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
+              fs.getFileStatus(pathList.get(0)), context);
+
+      Assert.assertTrue("There should have been an exception.", false);
+    }
+    catch (Exception ignore) {
+    }
+  }
+
+  @Test
+  public void testIgnoreFailures() {
+    doTestIgnoreFailures(true);
+    doTestIgnoreFailures(false);
+  }
+
+  @Test
+  public void testDirToFile() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      mkdirs(SOURCE_PATH + "/src/file");
+      touchFile(TARGET_PATH + "/src/file");
+      try {
+        copyMapper.setup(context);
+        copyMapper.map(new Text("/src/file"),
+            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            context);
+      } catch (IOException e) {
+        Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testPreserve() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+      
+      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
+          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+        @Override
+        public Mapper<Text, FileStatus, Text, Text>.Context run() {
+          try {
+            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+            return stubContext.getContext();
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH);
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.fail("Expected copy to fail");
+          } catch (AccessControlException e) {
+            Assert.assertTrue("Got exception: " + e.getMessage(), true);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCopyReadableFiles() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
+          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+        @Override
+        public Mapper<Text, FileStatus, Text, Text>.Context run() {
+          try {
+            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+            return stubContext.getContext();
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH);
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSkipCopyNoPerms() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final StubContext stubContext =  tmpUser.
+          doAs(new PrivilegedAction<StubContext>() {
+        @Override
+        public StubContext run() {
+          try {
+            return new StubContext(getConfiguration(), null, 0);
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      touchFile(TARGET_PATH + "/src/file");
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.assertEquals(stubContext.getWriter().values().size(), 1);
+            Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
+            Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
+                contains(SOURCE_PATH + "/src/file"));
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFailCopyWithAccessControlException() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final StubContext stubContext =  tmpUser.
+          doAs(new PrivilegedAction<StubContext>() {
+        @Override
+        public StubContext run() {
+          try {
+            return new StubContext(getConfiguration(), null, 0);
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+      
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
+      out.write("hello world".getBytes());
+      out.close();
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.fail("Didn't expect the file to be copied");
+          } catch (AccessControlException ignore) {
+          } catch (Exception e) {
+            if (e.getCause() == null || !(e.getCause() instanceof AccessControlException)) {
+              throw new RuntimeException(e);
+            }
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFileToDir() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH + "/src/file");
+      try {
+        copyMapper.setup(context);
+        copyMapper.map(new Text("/src/file"),
+            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            context);
+      } catch (IOException e) {
+        Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  private void doTestIgnoreFailures(boolean ignoreFailures) {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      configuration.setBoolean(
+              DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
+      configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+              true);
+      configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+              true);
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        if (!fileStatus.isDir()) {
+          fs.delete(path, true);
+          copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                  fileStatus, context);
+        }
+      }
+      if (ignoreFailures) {
+        for (Text value : stubContext.getWriter().values()) {
+          Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
+        }
+      }
+      Assert.assertTrue("There should have been an exception.", ignoreFailures);
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(),
+              !ignoreFailures);
+      e.printStackTrace();
+    }
+  }
+
+  private static void deleteState() throws IOException {
+    pathList.clear();
+    nFiles = 0;
+    cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
+    cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+  }
+
+  @Test
+  public void testPreserveBlockSizeAndReplication() {
+    testPreserveBlockSizeAndReplicationImpl(true);
+    testPreserveBlockSizeAndReplicationImpl(false);
+  }
+
+  private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
+    try {
+
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+              = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      if (preserve) {
+        fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
+        fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
+      }
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+              DistCpUtils.packAttributes(fileAttributes));
+
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fileStatus, context);
+      }
+
+      // Check that the block-size/replication aren't preserved.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        final FileStatus source = fs.getFileStatus(path);
+        final FileStatus target = fs.getFileStatus(targetPath);
+        if (!source.isDir() ) {
+          Assert.assertTrue(preserve ||
+                  source.getBlockSize() != target.getBlockSize());
+          Assert.assertTrue(preserve ||
+                  source.getReplication() != target.getReplication());
+          Assert.assertTrue(!preserve ||
+                  source.getBlockSize() == target.getBlockSize());
+          Assert.assertTrue(!preserve ||
+                  source.getReplication() == target.getReplication());
+        }
+      }
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+      e.printStackTrace();
+    }
+  }
+
+  private static void changeUserGroup(String user, String group)
+          throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    FsPermission changedPermission = new FsPermission(
+            FsAction.ALL, FsAction.ALL, FsAction.ALL
+    );
+    for (Path path : pathList)
+      if (fs.isFile(path)) {
+        fs.setOwner(path, user, group);
+        fs.setPermission(path, changedPermission);
+      }
+  }
+
+  /**
+   * If a single file is being copied to a location where the file (of the same
+   * name) already exists, then the file shouldn't be skipped.
+   */
+  @Test
+  public void testSingleFileCopy() {
+    try {
+      deleteState();
+      touchFile(SOURCE_PATH + "/1");
+      Path sourceFilePath = pathList.get(0);
+      Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
+              SOURCE_PATH, TARGET_PATH));
+      touchFile(targetFilePath.toString());
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      context.getConfiguration().set(
+              DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+              targetFilePath.getParent().toString()); // Parent directory.
+      copyMapper.setup(context);
+
+      final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+
+      long before = fs.getFileStatus(targetFilePath).getModificationTime();
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(
+              new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+      long after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+      Assert.assertTrue("File should have been skipped", before == after);
+
+      context.getConfiguration().set(
+              DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+              targetFilePath.toString()); // Specify the file path.
+      copyMapper.setup(context);
+
+      before = fs.getFileStatus(targetFilePath).getModificationTime();
+      try { Thread.sleep(2); } catch (Throwable ignore) {}
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(
+              new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+      after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+      Assert.assertTrue("File should have been overwritten.", before < after);
+
+    } catch (Exception exception) {
+      Assert.fail("Unexpected exception: " + exception.getMessage());
+      exception.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testPreserveUserGroup() {
+    testPreserveUserGroupImpl(true);
+    testPreserveUserGroupImpl(false);
+  }
+
+  private void testPreserveUserGroupImpl(boolean preserve){
+    try {
+
+      deleteState();
+      createSourceData();
+      changeUserGroup("Michael", "Corleone");
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+              = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      if (preserve) {
+        fileAttributes.add(DistCpOptions.FileAttribute.USER);
+        fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
+        fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
+      }
+
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+              DistCpUtils.packAttributes(fileAttributes));
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fileStatus, context);
+      }
+
+      // Check that the user/group attributes are preserved
+      // (only) as necessary.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        final FileStatus source = fs.getFileStatus(path);
+        final FileStatus target = fs.getFileStatus(targetPath);
+        if (!source.isDir()) {
+          Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
+          Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
+          Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
+          Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
+          Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
+          Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
+          Assert.assertTrue(source.isDir() ||
+                  source.getReplication() != target.getReplication());
+        }
+      }
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+      e.printStackTrace();
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCopyOutputFormat {
+  private static final Log LOG = LogFactory.getLog(TestCopyOutputFormat.class);
+
+  @Test
+  public void testSetCommitDirectory() {
+    try {
+      Job job = Job.getInstance(new Configuration());
+      Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, "");
+      Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+      Path directory = new Path("/tmp/test");
+      CopyOutputFormat.setCommitDirectory(job, directory);
+      Assert.assertEquals(directory, CopyOutputFormat.getCommitDirectory(job));
+      Assert.assertEquals(directory.toString(), job.getConfiguration().
+          get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running test", e);
+      Assert.fail("Failed while testing for set Commit Directory");
+    }
+  }
+
+  @Test
+  public void testSetWorkingDirectory() {
+    try {
+      Job job = Job.getInstance(new Configuration());
+      Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+      Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+      Path directory = new Path("/tmp/test");
+      CopyOutputFormat.setWorkingDirectory(job, directory);
+      Assert.assertEquals(directory, CopyOutputFormat.getWorkingDirectory(job));
+      Assert.assertEquals(directory.toString(), job.getConfiguration().
+          get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running test", e);
+      Assert.fail("Failed while testing for set Working Directory");
+    }
+  }
+
+  @Test
+  public void testGetOutputCommitter() {
+    try {
+      TaskAttemptContext context = new TaskAttemptContext(new Configuration(),
+        new TaskAttemptID("200707121733", 1, true, 1, 1));
+      context.getConfiguration().set("mapred.output.dir", "/out");
+      Assert.assertTrue(new CopyOutputFormat().getOutputCommitter(context) instanceof CopyCommitter);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Unable to get output committer");
+    }
+  }
+
+  @Test
+  public void testCheckOutputSpecs() {
+    try {
+      OutputFormat outputFormat = new CopyOutputFormat();
+      Job job = Job.getInstance(new Configuration());
+      JobID jobID = new JobID("200707121733", 1);
+
+      try {
+        JobContext context = new JobContext(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid work/commit path");
+      } catch (IllegalStateException ignore) { }
+
+      CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+      try {
+        JobContext context = new JobContext(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid commit path");
+      } catch (IllegalStateException ignore) { }
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+      CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+      try {
+        JobContext context = new JobContext(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid work path");
+      } catch (IllegalStateException ignore) { }
+
+      CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+      CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+      try {
+        JobContext context = new JobContext(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+      } catch (IllegalStateException ignore) {
+        Assert.fail("Output spec check failed.");
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing checkoutput specs", e);
+      Assert.fail("Checkoutput Spec failure");
+    } catch (InterruptedException e) {
+      LOG.error("Exception encountered while testing checkoutput specs", e);
+      Assert.fail("Checkoutput Spec failure");
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestUniformSizeInputFormat {
+  private static final Log LOG
+                = LogFactory.getLog(TestUniformSizeInputFormat.class);
+
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE=1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
+    distCpOptions.setMaxMaps(nMaps);
+    return distCpOptions;
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpOptions options = getOptions(nMaps);
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks",
+                      String.valueOf(options.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_1/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, options).
+        buildListing(listFile, options);
+
+    JobContext jobContext = new JobContext(configuration, new JobID());
+    UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();
+    List<InputSplit> splits
+            = uniformSizeInputFormat.getSplits(jobContext);
+
+    List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
+
+    int sizePerMap = totalFileSize/nMaps;
+
+    checkSplits(listFile, splits);
+    checkAgainstLegacy(splits, legacySplits);
+
+    int doubleCheckedTotalSize = 0;
+    int previousSplitSize = -1;
+    for (int i=0; i<splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
+              split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      while (recordReader.nextKeyValue()) {
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        Assert.assertEquals(fileStatus.length, 1);
+        currentSplitSize += fileStatus[0].getLen();
+      }
+      Assert.assertTrue(
+           previousSplitSize == -1
+               || Math.abs(currentSplitSize - previousSplitSize) < 0.1*sizePerMap
+               || i == splits.size()-1);
+
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+  }
+
+  // From
+  // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
+  private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
+      throws IOException {
+
+    FileSystem fs = cluster.getFileSystem();
+    FileStatus srcst = fs.getFileStatus(listFile);
+    Configuration conf = fs.getConf();
+
+    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+    FileStatus value = new FileStatus();
+    Text key = new Text();
+    final long targetsize = totalFileSize / numSplits;
+    long pos = 0L;
+    long last = 0L;
+    long acc = 0L;
+    long cbrem = srcst.getLen();
+    SequenceFile.Reader sl = null;
+
+    LOG.info("Average bytes per map: " + targetsize +
+        ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
+
+    try {
+      sl = new SequenceFile.Reader(fs, listFile, conf);
+      for (; sl.next(key, value); last = sl.getPosition()) {
+        // if adding this split would put this split past the target size,
+        // cut the last split and put this next file in the next split.
+        if (acc + value.getLen() > targetsize && acc != 0) {
+          long splitsize = last - pos;
+          FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
+          LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
+          splits.add(fileSplit);
+          cbrem -= splitsize;
+          pos = last;
+          acc = 0L;
+        }
+        acc += value.getLen();
+      }
+    }
+    finally {
+      IOUtils.closeStream(sl);
+    }
+    if (cbrem != 0) {
+      FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
+      LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
+      splits.add(fileSplit);
+    }
+
+    return splits;
+  }
+
+  private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
+    long lastEnd = 0;
+
+    //Verify if each split's start is matching with the previous end and
+    //we are not missing anything
+    for (InputSplit split : splits) {
+      FileSplit fileSplit = (FileSplit) split;
+      long start = fileSplit.getStart();
+      Assert.assertEquals(lastEnd, start);
+      lastEnd = start + fileSplit.getLength();
+    }
+
+    //Verify there is nothing more to read from the input file
+    FileSystem fs = cluster.getFileSystem();
+    SequenceFile.Reader reader
+        = new SequenceFile.Reader(fs, listFile, fs.getConf());
+
+    try {
+      reader.seek(lastEnd);
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+      Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+
+  private void checkAgainstLegacy(List<InputSplit> splits,
+                                  List<InputSplit> legacySplits)
+      throws IOException, InterruptedException {
+
+    Assert.assertEquals(legacySplits.size(), splits.size());
+    for (int index = 0; index < splits.size(); index++) {
+      FileSplit fileSplit = (FileSplit) splits.get(index);
+      FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
+      Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
+    }
+  }
+
+  @Test
+  public void testGetSplits() throws Exception {
+    testGetSplits(9);
+    for (int i=1; i<N_FILES; ++i)
+      testGetSplits(i);
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred.lib;
+
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.StubContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDynamicInputFormat {
+  private static final Log LOG = LogFactory.getLog(TestDynamicInputFormat.class);
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 1000;
+  private static final int NUM_SPLITS = 7;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  private static List<String> expectedFilePaths = new ArrayList<String>(N_FILES);
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster(getConfigurationForCluster(), 1, true, null);
+
+    for (int i=0; i<N_FILES; ++i)
+      createFile("/tmp/source/" + String.valueOf(i));
+
+  }
+
+  private static Configuration getConfigurationForCluster() {
+    Configuration configuration = new Configuration();
+    System.setProperty("test.build.data",
+                       "target/tmp/build/TEST_DYNAMIC_INPUT_FORMAT/data");
+    configuration.set("hadoop.log.dir", "target/tmp");
+    LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
+    LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+    return configuration;
+  }
+
+  private static DistCpOptions getOptions() throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+            + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+            + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    DistCpOptions options = new DistCpOptions(sourceList, targetPath);
+    options.setMaxMaps(NUM_SPLITS);
+    return options;
+  }
+
+  private static void createFile(String path) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      expectedFilePaths.add(fileSystem.listStatus(
+                                    new Path(path))[0].getPath().toString());
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testGetSplits() throws Exception {
+    DistCpOptions options = getOptions();
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks",
+                      String.valueOf(options.getMaxMaps()));
+    CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
+            new Path(cluster.getFileSystem().getUri().toString()
+                    +"/tmp/testDynInputFormat/fileList.seq"), options);
+
+    JobContext jobContext = new JobContext(configuration, new JobID());
+    DynamicInputFormat<Text, FileStatus> inputFormat =
+        new DynamicInputFormat<Text, FileStatus>();
+    List<InputSplit> splits = inputFormat.getSplits(jobContext);
+
+    int nFiles = 0;
+    int taskId = 0;
+
+    for (InputSplit split : splits) {
+      RecordReader<Text, FileStatus> recordReader =
+           inputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, taskId);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      
+      recordReader.initialize(splits.get(0), taskAttemptContext);
+      float previousProgressValue = 0f;
+      while (recordReader.nextKeyValue()) {
+        FileStatus fileStatus = recordReader.getCurrentValue();
+        String source = fileStatus.getPath().toString();
+        System.out.println(source);
+        Assert.assertTrue(expectedFilePaths.contains(source));
+        final float progress = recordReader.getProgress();
+        Assert.assertTrue(progress >= previousProgressValue);
+        Assert.assertTrue(progress >= 0.0f);
+        Assert.assertTrue(progress <= 1.0f);
+        previousProgressValue = progress;
+        ++nFiles;
+      }
+      Assert.assertTrue(recordReader.getProgress() == 1.0f);
+
+      ++taskId;
+    }
+
+    Assert.assertEquals(expectedFilePaths.size(), nFiles);
+  }
+
+  @Test
+  public void testGetSplitRatio() throws Exception {
+    Assert.assertEquals(1, DynamicInputFormat.getSplitRatio(1, 1000000000));
+    Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
+    Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
+    Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDistCpUtils {
+  private static final Log LOG = LogFactory.getLog(TestDistCpUtils.class);
+
+  private static final Configuration config = new Configuration();
+  private static MiniDFSCluster cluster;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    cluster = new MiniDFSCluster(config, 1, true, null);
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetRelativePathRoot() {
+    Path root = new Path("/tmp/abc");
+    Path child = new Path("/tmp/abc/xyz/file");
+    Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/xyz/file");
+
+    root = new Path("/");
+    child = new Path("/a");
+    Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/a");
+  }
+
+  @Test
+  public void testPackAttributes() {
+    EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "");
+
+    attributes.add(FileAttribute.REPLICATION);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "R");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("R"));
+
+    attributes.add(FileAttribute.BLOCKSIZE);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RB");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RB"));
+
+    attributes.add(FileAttribute.USER);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBU");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBU"));
+
+    attributes.add(FileAttribute.GROUP);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUG");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUG"));
+
+    attributes.add(FileAttribute.PERMISSION);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUGP");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUGP"));
+  }
+
+  @Test
+  public void testPreserve() {
+    try {
+      FileSystem fs = FileSystem.get(config);
+      EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+
+
+      Path path = new Path("/tmp/abc");
+      Path src = new Path("/tmp/src");
+      fs.mkdirs(path);
+      fs.mkdirs(src);
+      FileStatus srcStatus = fs.getFileStatus(src);
+
+      FsPermission noPerm = new FsPermission((short) 0);
+      fs.setPermission(path, noPerm);
+      fs.setOwner(path, "nobody", "nobody");
+
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      FileStatus target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), noPerm);
+      Assert.assertEquals(target.getOwner(), "nobody");
+      Assert.assertEquals(target.getGroup(), "nobody");
+
+      attributes.add(FileAttribute.PERMISSION);
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+      Assert.assertEquals(target.getOwner(), "nobody");
+      Assert.assertEquals(target.getGroup(), "nobody");
+
+      attributes.add(FileAttribute.GROUP);
+      attributes.add(FileAttribute.USER);
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+      Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
+      Assert.assertEquals(target.getGroup(), srcStatus.getGroup());
+
+      fs.delete(path, true);
+      fs.delete(src, true);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Preserve test failure");
+    }
+  }
+
+  private static Random rand = new Random();
+
+  public static String createTestSetup(FileSystem fs) throws IOException {
+    return createTestSetup("/tmp1", fs, FsPermission.getDefault());
+  }
+  
+  public static String createTestSetup(FileSystem fs,
+                                       FsPermission perm) throws IOException {
+    return createTestSetup("/tmp1", fs, perm);
+  }
+
+  public static String createTestSetup(String baseDir,
+                                       FileSystem fs,
+                                       FsPermission perm) throws IOException {
+    String base = getBase(baseDir);
+    fs.mkdirs(new Path(base + "/newTest/hello/world1"));
+    fs.mkdirs(new Path(base + "/newTest/hello/world2/newworld"));
+    fs.mkdirs(new Path(base + "/newTest/hello/world3/oldworld"));
+    fs.setPermission(new Path(base + "/newTest"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world1"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world2"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world2/newworld"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world3"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world3/oldworld"), perm);
+    createFile(fs, base + "/newTest/1");
+    createFile(fs, base + "/newTest/hello/2");
+    createFile(fs, base + "/newTest/hello/world3/oldworld/3");
+    createFile(fs, base + "/newTest/hello/world2/4");
+    return base;
+  }
+
+  private static String getBase(String base) {
+    String location = String.valueOf(rand.nextLong());
+    return base + "/" + location;
+  }
+
+  public static void delete(FileSystem fs, String path) {
+    try {
+      if (fs != null) {
+        if (path != null) {
+          fs.delete(new Path(path), true);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception encountered ", e);
+    }
+  }
+
+  public static void createFile(FileSystem fs, String filePath) throws IOException {
+    OutputStream out = fs.create(new Path(filePath));
+    IOUtils.closeStream(out);
+  }
+
+  public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
+      throws IOException {
+    Path base = new Path(targetBase);
+
+     Stack<Path> stack = new Stack<Path>();
+     stack.push(base);
+     while (!stack.isEmpty()) {
+       Path file = stack.pop();
+       if (!fs.exists(file)) continue;
+       FileStatus[] fStatus = fs.listStatus(file);
+       if (fStatus == null || fStatus.length == 0) continue;
+
+       for (FileStatus status : fStatus) {
+         if (status.isDir()) {
+           stack.push(status.getPath());
+         }
+         Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
+             DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
+       }
+     }
+     return true;
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestRetriableCommand {
+
+  private static class MyRetriableCommand extends RetriableCommand {
+
+    private int succeedAfter;
+    private int retryCount = 0;
+
+    public MyRetriableCommand(int succeedAfter) {
+      super("MyRetriableCommand");
+      this.succeedAfter = succeedAfter;
+    }
+
+    public MyRetriableCommand(int succeedAfter, RetryPolicy retryPolicy) {
+      super("MyRetriableCommand", retryPolicy);
+      this.succeedAfter = succeedAfter;
+    }
+
+    @Override
+    protected Object doExecute(Object... arguments) throws Exception {
+      if (++retryCount < succeedAfter)
+        throw new Exception("Transient failure#" + retryCount);
+      return 0;
+    }
+  }
+
+  @Test
+  public void testRetriableCommand() {
+    try {
+      new MyRetriableCommand(5).execute(0);
+      Assert.assertTrue(false);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+
+
+    try {
+      new MyRetriableCommand(3).execute(0);
+      Assert.assertTrue(true);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(false);
+    }
+
+    try {
+      new MyRetriableCommand(5, RetryPolicies.
+          retryUpToMaximumCountWithFixedSleep(5, 0, TimeUnit.MILLISECONDS)).execute(0);
+      Assert.assertTrue(true);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(false);
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+
+public class TestThrottledInputStream {
+  private static final Log LOG = LogFactory.getLog(TestThrottledInputStream.class);
+  private static final int BUFF_SIZE = 1024;
+
+  private enum CB {ONE_C, BUFFER, BUFF_OFFSET}
+
+  @Test
+  public void testRead() {
+    File tmpFile;
+    File outFile;
+    try {
+      tmpFile = createFile(1024);
+      outFile = createFile();
+
+      tmpFile.deleteOnExit();
+      outFile.deleteOnExit();
+
+      long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFFER);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFFER);
+*/
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFF_OFFSET);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFF_OFFSET);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFF_OFFSET);
+*/
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.ONE_C);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.ONE_C);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.ONE_C);
+*/
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+    }
+  }
+
+  private long copyAndAssert(File tmpFile, File outFile,
+                             long maxBandwidth, float factor,
+                             int sleepTime, CB flag) throws IOException {
+    long bandwidth;
+    ThrottledInputStream in;
+    long maxBPS = (long) (maxBandwidth / factor);
+
+    if (maxBandwidth == 0) {
+      in = new ThrottledInputStream(new FileInputStream(tmpFile));
+    } else {
+      in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+    }
+    OutputStream out = new FileOutputStream(outFile);
+    try {
+      if (flag == CB.BUFFER) {
+        copyBytes(in, out, BUFF_SIZE);
+      } else if (flag == CB.BUFF_OFFSET){
+        copyBytesWithOffset(in, out, BUFF_SIZE);
+      } else {
+        copyByteByByte(in, out);
+      }
+
+      LOG.info(in);
+      bandwidth = in.getBytesPerSec();
+      Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+      Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
+      Assert.assertTrue(in.getTotalSleepTime() >  sleepTime || in.getBytesPerSec() <= maxBPS);
+    } finally {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+    }
+    return bandwidth;
+  }
+
+  private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize)
+    throws IOException {
+
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf, 0, buffSize);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      bytesRead = in.read(buf);
+    }
+  }
+
+  private static void copyByteByByte(InputStream in, OutputStream out)
+    throws IOException {
+
+    int ch = in.read();
+    while (ch >= 0) {
+      out.write(ch);
+      ch = in.read();
+    }
+  }
+
+  private static void copyBytes(InputStream in, OutputStream out, int buffSize)
+    throws IOException {
+
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      bytesRead = in.read(buf);
+    }
+  }
+
+  private File createFile(long sizeInKB) throws IOException {
+    File tmpFile = createFile();
+    writeToFile(tmpFile, sizeInKB);
+    return tmpFile;
+  }
+
+  private File createFile() throws IOException {
+    return File.createTempFile("tmp", "dat");
+  }
+
+  private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
+    OutputStream out = new FileOutputStream(tmpFile);
+    try {
+      byte[] buffer = new byte [1024];
+      for (long index = 0; index < sizeInKB; index++) {
+        out.write(buffer);
+      }
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/sslConfig.xml?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/sslConfig.xml (added)
+++ hadoop/common/branches/branch-1/src/test/sslConfig.xml Thu Mar 28 05:00:09 2013
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+  <name>ssl.client.truststore.location</name>
+  <value>/path/to/truststore/keys/keystore.jks</value>
+  <description>Truststore to be used by clients like distcp. Must be
+  specified.
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.truststore.password</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.truststore.type</name>
+  <value>jks</value>
+  <description>Optional. Default value is "jks".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.location</name>
+  <value>/path/to/keystore/keys/keystore.jks</value>
+  <description>Keystore to be used by clients like distcp. Must be
+  specified.
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.password</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.keypassword</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.type</name>
+  <value>jks</value>
+  <description>Optional. Default value is "jks".
+  </description>
+</property>
+
+</configuration>

Added: hadoop/common/branches/branch-1/src/tools/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/distcp-default.xml?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/distcp-default.xml (added)
+++ hadoop/common/branches/branch-1/src/tools/distcp-default.xml Thu Mar 28 05:00:09 2013
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly. Anything that need to be overwritten,
+     need to be done so, through -D switches or customized conf -->
+
+<configuration>
+
+    <property>
+        <name>distcp.dynamic.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
+        <description>Implementation of dynamic input format</description>
+    </property>
+
+    <property>
+        <name>distcp.static.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
+        <description>Implementation of static input format</description>
+    </property>
+
+    <property>
+        <name>mapred.job.map.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.job.reduce.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.reducer.new-api</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.class</name>
+        <value>org.apache.hadoop.mapreduce.Reducer</value>
+    </property>
+
+</configuration>

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+/**
+ * The CopyListing abstraction is responsible for how the list of
+ * sources and targets is constructed, for DistCp's copy function.
+ * The copy-listing should be a SequenceFile<Text, FileStatus>,
+ * located at the path specified to buildListing(),
+ * each entry being a pair of (Source relative path, source file status),
+ * all the paths being fully qualified.
+ */
+public abstract class CopyListing extends Configured {
+
+  private Credentials credentials;
+
+  /**
+   * Build listing function creates the input listing that distcp uses to
+   * perform the copy.
+   *
+   * The build listing is a sequence file that has relative path of a file in the key
+   * and the file status information of the source file in the value
+   *
+   * For instance if the source path is /tmp/data and the traversed path is
+   * /tmp/data/dir1/dir2/file1, then the sequence file would contain
+   *
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * File would also contain directory entries. Meaning, if /tmp/data/dir1/dir2/file1
+   * is the only file under /tmp/data, the resulting sequence file would contain the
+   * following entries
+   *
+   * key: /dir1 and value: FileStatus(/tmp/data/dir1)
+   * key: /dir1/dir2 and value: FileStatus(/tmp/data/dir1/dir2)
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * Cases requiring special handling:
+   * If source path is a file (/tmp/file1), contents of the file will be as follows
+   *
+   * TARGET DOES NOT EXIST: Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS FILE       : Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS DIR        : Key-"/file1", Value-FileStatus(/tmp/file1)  
+   *
+   * @param pathToListFile - Output file where the listing would be stored
+   * @param options - Input options to distcp
+   * @throws IOException - Exception if any
+   */
+  public final void buildListing(Path pathToListFile,
+                                 DistCpOptions options) throws IOException {
+    validatePaths(options);
+    doBuildListing(pathToListFile, options);
+    Configuration config = getConf();
+
+    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
+
+    checkForDuplicates(pathToListFile);
+  }
+
+  /**
+   * Validate input and output paths
+   *
+   * @param options - Input options
+   * @throws InvalidInputException: If inputs are invalid
+   * @throws IOException: any Exception with FS 
+   */
+  protected abstract void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException;
+
+  /**
+   * The interface to be implemented by sub-classes, to create the source/target file listing.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException: Thrown on failure to create the listing file.
+   */
+  protected abstract void doBuildListing(Path pathToListFile,
+                                         DistCpOptions options) throws IOException;
+
+  /**
+   * Return the total bytes that distCp should copy for the source paths
+   * This doesn't consider whether file is same should be skipped during copy
+   *
+   * @return total bytes to copy
+   */
+  protected abstract long getBytesToCopy();
+
+  /**
+   * Return the total number of paths to distcp, includes directories as well
+   * This doesn't consider whether file/dir is already present and should be skipped during copy
+   *
+   * @return Total number of paths to distcp
+   */
+  protected abstract long getNumberOfPaths();
+
+  /**
+   * Validate the final resulting path listing to see if there are any duplicate entries
+   *
+   * @param pathToListFile - path listing build by doBuildListing
+   * @throws IOException - Any issues while checking for duplicates and throws
+   * @throws DuplicateFileException - if there are duplicates
+   */
+  private void checkForDuplicates(Path pathToListFile)
+      throws DuplicateFileException, IOException {
+
+    Configuration config = getConf();
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, sortedList, config);
+    try {
+      Text lastKey = new Text("*"); //source relative path can never hold *
+      FileStatus lastFileStatus = new FileStatus();
+
+      Text currentKey = new Text();
+      while (reader.next(currentKey)) {
+        if (currentKey.equals(lastKey)) {
+          FileStatus currentFileStatus = new FileStatus();
+          reader.getCurrentValue(currentFileStatus);
+          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+              currentFileStatus.getPath() + " would cause duplicates. Aborting");
+        }
+        reader.getCurrentValue(lastFileStatus);
+        lastKey.set(currentKey);
+      }
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+
+  /**
+   * Protected constructor, to initialize configuration.
+   * @param configuration The input configuration,
+   *                        with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached.If null
+   * delegation token caching is skipped
+   */
+  protected CopyListing(Configuration configuration, Credentials credentials) {
+    setConf(configuration);
+    setCredentials(credentials);
+  }
+
+  /**
+   * set Credentials store, on which FS delegatin token will be cached
+   * @param credentials - Credentials object
+   */
+  protected void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  /**
+   * get credentials to update the delegation tokens for accessed FS objects
+   * @return Credentials object
+   */
+  protected Credentials getCredentials() {
+    return credentials;
+  }
+
+  /**
+   * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
+   * @param configuration The input configuration.
+   * @param credentials Credentials object on which the FS delegation tokens are cached
+   * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+   * @return An instance of the appropriate CopyListing implementation.
+   * @throws java.io.IOException - Exception if any
+   */
+  public static CopyListing getCopyListing(Configuration configuration,
+                                           Credentials credentials,
+                                           DistCpOptions options)
+      throws IOException {
+
+    String copyListingClassName = configuration.get(DistCpConstants.
+        CONF_LABEL_COPY_LISTING_CLASS, "");
+    Class<? extends CopyListing> copyListingClass;
+    try {
+      if (! copyListingClassName.isEmpty()) {
+        copyListingClass = configuration.getClass(DistCpConstants.
+            CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
+            CopyListing.class);
+      } else {
+        if (options.getSourceFileListing() == null) {
+            copyListingClass = GlobbedCopyListing.class;
+        } else {
+            copyListingClass = FileBasedCopyListing.class;
+        }
+      }
+      copyListingClassName = copyListingClass.getName();
+      Constructor<? extends CopyListing> constructor = copyListingClass.
+          getDeclaredConstructor(Configuration.class, Credentials.class);
+      return constructor.newInstance(configuration, credentials);
+    } catch (Exception e) {
+      throw new IOException("Unable to instantiate " + copyListingClassName, e);
+    }
+  }
+
+  static class DuplicateFileException extends RuntimeException {
+    public DuplicateFileException(String message) {
+      super(message);
+    }
+  }
+
+  static class InvalidInputException extends RuntimeException {
+    public InvalidInputException(String message) {
+      super(message);
+    }
+  }
+}