You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC

svn commit: r1236045 [5/5] - in /hadoop/common/trunk: hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/ hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,826 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+public class TestCopyMapper {
+  private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
+  private static List<Path> pathList = new ArrayList<Path>();
+  private static int nFiles = 0;
+  private static final int FILE_SIZE = 1024;
+
+  private static MiniDFSCluster cluster;
+
+  private static final String SOURCE_PATH = "/tmp/source";
+  private static final String TARGET_PATH = "/tmp/target";
+
+  private static Configuration configuration;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    configuration = getConfigurationForCluster();
+    cluster = new MiniDFSCluster.Builder(configuration)
+                .numDataNodes(1)
+                .format(true)
+                .build();
+  }
+
+  private static Configuration getConfigurationForCluster() throws IOException {
+    Configuration configuration = new Configuration();
+    System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+    configuration.set("hadoop.log.dir", "target/tmp");
+    LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
+    LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+    return configuration;
+  }
+
+  private static Configuration getConfiguration() throws IOException {
+    Configuration configuration = getConfigurationForCluster();
+    final FileSystem fs = cluster.getFileSystem();
+    Path workPath = new Path(TARGET_PATH)
+            .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+            workPath.toString());
+    configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+            workPath.toString());
+    configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+            false);
+    configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+            true);
+    configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
+            true);
+    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+            "br");
+    return configuration;
+  }
+
+  private static void createSourceData() throws Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6");
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9");
+  }
+
+  private static void mkdirs(String path) throws Exception {
+    FileSystem fileSystem = cluster.getFileSystem();
+    final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+                                              fileSystem.getWorkingDirectory());
+    pathList.add(qualifiedPath);
+    fileSystem.mkdirs(qualifiedPath);
+  }
+
+  private static void touchFile(String path) throws Exception {
+    FileSystem fs;
+    DataOutputStream outputStream = null;
+    try {
+      fs = cluster.getFileSystem();
+      final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+                                                      fs.getWorkingDirectory());
+      final long blockSize = fs.getDefaultBlockSize() * 2;
+      outputStream = fs.create(qualifiedPath, true, 0,
+              (short)(fs.getDefaultReplication()*2),
+              blockSize);
+      outputStream.write(new byte[FILE_SIZE]);
+      pathList.add(qualifiedPath);
+      ++nFiles;
+
+      FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
+      System.out.println(fileStatus.getBlockSize());
+      System.out.println(fileStatus.getReplication());
+    }
+    finally {
+      IOUtils.cleanup(null, outputStream);
+    }
+  }
+
+  @Test
+  public void testRun() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+      copyMapper.setup(context);
+
+      for (Path path: pathList) {
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fs.getFileStatus(path), context);
+      }
+
+      // Check that the maps worked.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        Assert.assertTrue(fs.exists(targetPath));
+        Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+        Assert.assertEquals(fs.getFileStatus(path).getReplication(),
+                fs.getFileStatus(targetPath).getReplication());
+        Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
+                fs.getFileStatus(targetPath).getBlockSize());
+        Assert.assertTrue(!fs.isFile(targetPath) ||
+                fs.getFileChecksum(targetPath).equals(
+                        fs.getFileChecksum(path)));
+      }
+
+      Assert.assertEquals(pathList.size(),
+              stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
+      Assert.assertEquals(nFiles * FILE_SIZE,
+              stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+      testCopyingExistingFiles(fs, copyMapper, context);
+      for (Text value : stubContext.getWriter().values()) {
+        Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
+      }
+    }
+    catch (Exception e) {
+      LOG.error("Unexpected exception: ", e);
+      Assert.assertTrue(false);
+    }
+  }
+
+  private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+                                        Mapper<Text, FileStatus, Text, Text>.Context context) {
+
+    try {
+      for (Path path : pathList) {
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fs.getFileStatus(path), context);
+      }
+
+      Assert.assertEquals(nFiles,
+              context.getCounter(CopyMapper.Counter.SKIP).getValue());
+    }
+    catch (Exception exception) {
+      Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
+              false);
+    }
+  }
+
+  @Test
+  public void testMakeDirFailure() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+      configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+              workPath);
+      copyMapper.setup(context);
+
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
+              fs.getFileStatus(pathList.get(0)), context);
+
+      Assert.assertTrue("There should have been an exception.", false);
+    }
+    catch (Exception ignore) {
+    }
+  }
+
+  @Test
+  public void testIgnoreFailures() {
+    doTestIgnoreFailures(true);
+    doTestIgnoreFailures(false);
+  }
+
+  @Test
+  public void testDirToFile() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      mkdirs(SOURCE_PATH + "/src/file");
+      touchFile(TARGET_PATH + "/src/file");
+      try {
+        copyMapper.setup(context);
+        copyMapper.map(new Text("/src/file"),
+            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            context);
+      } catch (IOException e) {
+        Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testPreserve() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+      
+      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
+          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+        @Override
+        public Mapper<Text, FileStatus, Text, Text>.Context run() {
+          try {
+            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+            return stubContext.getContext();
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH);
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.fail("Expected copy to fail");
+          } catch (AccessControlException e) {
+            Assert.assertTrue("Got exception: " + e.getMessage(), true);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCopyReadableFiles() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
+          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+        @Override
+        public Mapper<Text, FileStatus, Text, Text>.Context run() {
+          try {
+            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+            return stubContext.getContext();
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH);
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSkipCopyNoPerms() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final StubContext stubContext =  tmpUser.
+          doAs(new PrivilegedAction<StubContext>() {
+        @Override
+        public StubContext run() {
+          try {
+            return new StubContext(getConfiguration(), null, 0);
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      touchFile(TARGET_PATH + "/src/file");
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.assertEquals(stubContext.getWriter().values().size(), 1);
+            Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
+            Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
+                contains(SOURCE_PATH + "/src/file"));
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFailCopyWithAccessControlException() {
+    try {
+      deleteState();
+      createSourceData();
+
+      UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+      final CopyMapper copyMapper = new CopyMapper();
+
+      final StubContext stubContext =  tmpUser.
+          doAs(new PrivilegedAction<StubContext>() {
+        @Override
+        public StubContext run() {
+          try {
+            return new StubContext(getConfiguration(), null, 0);
+          } catch (Exception e) {
+            LOG.error("Exception encountered ", e);
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+          EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+      final Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+      
+      context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+
+      touchFile(SOURCE_PATH + "/src/file");
+      OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
+      out.write("hello world".getBytes());
+      out.close();
+      cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+      cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+          new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+      final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+        @Override
+        public FileSystem run() {
+          try {
+            return FileSystem.get(configuration);
+          } catch (IOException e) {
+            LOG.error("Exception encountered ", e);
+            Assert.fail("Test failed: " + e.getMessage());
+            throw new RuntimeException("Test ought to fail here");
+          }
+        }
+      });
+
+      tmpUser.doAs(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            copyMapper.setup(context);
+            copyMapper.map(new Text("/src/file"),
+                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                context);
+            Assert.fail("Didn't expect the file to be copied");
+          } catch (AccessControlException ignore) {
+          } catch (Exception e) {
+            if (e.getCause() == null || !(e.getCause() instanceof AccessControlException)) {
+              throw new RuntimeException(e);
+            }
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFileToDir() {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      touchFile(SOURCE_PATH + "/src/file");
+      mkdirs(TARGET_PATH + "/src/file");
+      try {
+        copyMapper.setup(context);
+        copyMapper.map(new Text("/src/file"),
+            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            context);
+      } catch (IOException e) {
+        Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test failed: " + e.getMessage());
+    }
+  }
+
+  private void doTestIgnoreFailures(boolean ignoreFailures) {
+    try {
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      configuration.setBoolean(
+              DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
+      configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+              true);
+      configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+              true);
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        if (!fileStatus.isDirectory()) {
+          fs.delete(path, true);
+          copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                  fileStatus, context);
+        }
+      }
+      if (ignoreFailures) {
+        for (Text value : stubContext.getWriter().values()) {
+          Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
+        }
+      }
+      Assert.assertTrue("There should have been an exception.", ignoreFailures);
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(),
+              !ignoreFailures);
+      e.printStackTrace();
+    }
+  }
+
+  private static void deleteState() throws IOException {
+    pathList.clear();
+    nFiles = 0;
+    cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
+    cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+  }
+
+  @Test
+  public void testPreserveBlockSizeAndReplication() {
+    testPreserveBlockSizeAndReplicationImpl(true);
+    testPreserveBlockSizeAndReplicationImpl(false);
+  }
+
+  private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
+    try {
+
+      deleteState();
+      createSourceData();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+              = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      if (preserve) {
+        fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
+        fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
+      }
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+              DistCpUtils.packAttributes(fileAttributes));
+
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fileStatus, context);
+      }
+
+      // Check that the block-size/replication aren't preserved.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        final FileStatus source = fs.getFileStatus(path);
+        final FileStatus target = fs.getFileStatus(targetPath);
+        if (!source.isDirectory() ) {
+          Assert.assertTrue(preserve ||
+                  source.getBlockSize() != target.getBlockSize());
+          Assert.assertTrue(preserve ||
+                  source.getReplication() != target.getReplication());
+          Assert.assertTrue(!preserve ||
+                  source.getBlockSize() == target.getBlockSize());
+          Assert.assertTrue(!preserve ||
+                  source.getReplication() == target.getReplication());
+        }
+      }
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+      e.printStackTrace();
+    }
+  }
+
+  private static void changeUserGroup(String user, String group)
+          throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    FsPermission changedPermission = new FsPermission(
+            FsAction.ALL, FsAction.ALL, FsAction.ALL
+    );
+    for (Path path : pathList)
+      if (fs.isFile(path)) {
+        fs.setOwner(path, user, group);
+        fs.setPermission(path, changedPermission);
+      }
+  }
+
+  /**
+   * If a single file is being copied to a location where the file (of the same
+   * name) already exists, then the file shouldn't be skipped.
+   */
+  @Test
+  public void testSingleFileCopy() {
+    try {
+      deleteState();
+      touchFile(SOURCE_PATH + "/1");
+      Path sourceFilePath = pathList.get(0);
+      Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
+              SOURCE_PATH, TARGET_PATH));
+      touchFile(targetFilePath.toString());
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      context.getConfiguration().set(
+              DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+              targetFilePath.getParent().toString()); // Parent directory.
+      copyMapper.setup(context);
+
+      final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+
+      long before = fs.getFileStatus(targetFilePath).getModificationTime();
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(
+              new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+      long after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+      Assert.assertTrue("File should have been skipped", before == after);
+
+      context.getConfiguration().set(
+              DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+              targetFilePath.toString()); // Specify the file path.
+      copyMapper.setup(context);
+
+      before = fs.getFileStatus(targetFilePath).getModificationTime();
+      try { Thread.sleep(2); } catch (Throwable ignore) {}
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(
+              new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+      after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+      Assert.assertTrue("File should have been overwritten.", before < after);
+
+    } catch (Exception exception) {
+      Assert.fail("Unexpected exception: " + exception.getMessage());
+      exception.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testPreserveUserGroup() {
+    testPreserveUserGroupImpl(true);
+    testPreserveUserGroupImpl(false);
+  }
+
+  private void testPreserveUserGroupImpl(boolean preserve){
+    try {
+
+      deleteState();
+      createSourceData();
+      changeUserGroup("Michael", "Corleone");
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+              = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+              = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      if (preserve) {
+        fileAttributes.add(DistCpOptions.FileAttribute.USER);
+        fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
+        fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
+      }
+
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+              DistCpUtils.packAttributes(fileAttributes));
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+                fileStatus, context);
+      }
+
+      // Check that the user/group attributes are preserved
+      // (only) as necessary.
+      for (Path path : pathList) {
+        final Path targetPath = new Path(path.toString()
+                .replaceAll(SOURCE_PATH, TARGET_PATH));
+        final FileStatus source = fs.getFileStatus(path);
+        final FileStatus target = fs.getFileStatus(targetPath);
+        if (!source.isDirectory()) {
+          Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
+          Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
+          Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
+          Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
+          Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
+          Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
+          Assert.assertTrue(source.isDirectory() ||
+                  source.getReplication() != target.getReplication());
+        }
+      }
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+      e.printStackTrace();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class TestCopyOutputFormat {
+  private static final Log LOG = LogFactory.getLog(TestCopyOutputFormat.class);
+
+  @Test
+  public void testSetCommitDirectory() {
+    try {
+      Job job = Job.getInstance(new Configuration());
+      Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, "");
+      Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+      Path directory = new Path("/tmp/test");
+      CopyOutputFormat.setCommitDirectory(job, directory);
+      Assert.assertEquals(directory, CopyOutputFormat.getCommitDirectory(job));
+      Assert.assertEquals(directory.toString(), job.getConfiguration().
+          get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running test", e);
+      Assert.fail("Failed while testing for set Commit Directory");
+    }
+  }
+
+  @Test
+  public void testSetWorkingDirectory() {
+    try {
+      Job job = Job.getInstance(new Configuration());
+      Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+      Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+      Path directory = new Path("/tmp/test");
+      CopyOutputFormat.setWorkingDirectory(job, directory);
+      Assert.assertEquals(directory, CopyOutputFormat.getWorkingDirectory(job));
+      Assert.assertEquals(directory.toString(), job.getConfiguration().
+          get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running test", e);
+      Assert.fail("Failed while testing for set Working Directory");
+    }
+  }
+
+  @Test
+  public void testGetOutputCommitter() {
+    try {
+      TaskAttemptContext context = new TaskAttemptContextImpl(new Configuration(),
+        new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
+      context.getConfiguration().set("mapred.output.dir", "/out");
+      Assert.assertTrue(new CopyOutputFormat().getOutputCommitter(context) instanceof CopyCommitter);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Unable to get output committer");
+    }
+  }
+
+  @Test
+  public void testCheckOutputSpecs() {
+    try {
+      OutputFormat outputFormat = new CopyOutputFormat();
+      Job job = Job.getInstance(new Configuration());
+      JobID jobID = new JobID("200707121733", 1);
+
+      try {
+        JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid work/commit path");
+      } catch (IllegalStateException ignore) { }
+
+      CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+      try {
+        JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid commit path");
+      } catch (IllegalStateException ignore) { }
+
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+      CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+      try {
+        JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+        Assert.fail("No checking for invalid work path");
+      } catch (IllegalStateException ignore) { }
+
+      CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+      CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+      try {
+        JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+        outputFormat.checkOutputSpecs(context);
+      } catch (IllegalStateException ignore) {
+        Assert.fail("Output spec check failed.");
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing checkoutput specs", e);
+      Assert.fail("Checkoutput Spec failure");
+    } catch (InterruptedException e) {
+      LOG.error("Exception encountered while testing checkoutput specs", e);
+      Assert.fail("Checkoutput Spec failure");
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformSizeInputFormat {
+  private static final Log LOG
+                = LogFactory.getLog(TestUniformSizeInputFormat.class);
+
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE=1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
+    distCpOptions.setMaxMaps(nMaps);
+    return distCpOptions;
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpOptions options = getOptions(nMaps);
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks",
+                      String.valueOf(options.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_1/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, options).
+        buildListing(listFile, options);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();
+    List<InputSplit> splits
+            = uniformSizeInputFormat.getSplits(jobContext);
+
+    List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
+
+    int sizePerMap = totalFileSize/nMaps;
+
+    checkSplits(listFile, splits);
+    checkAgainstLegacy(splits, legacySplits);
+
+    int doubleCheckedTotalSize = 0;
+    int previousSplitSize = -1;
+    for (int i=0; i<splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
+              split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      while (recordReader.nextKeyValue()) {
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        Assert.assertEquals(fileStatus.length, 1);
+        currentSplitSize += fileStatus[0].getLen();
+      }
+      Assert.assertTrue(
+           previousSplitSize == -1
+               || Math.abs(currentSplitSize - previousSplitSize) < 0.1*sizePerMap
+               || i == splits.size()-1);
+
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+  }
+
+  // From
+  // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
+  private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
+      throws IOException {
+
+    FileSystem fs = cluster.getFileSystem();
+    FileStatus srcst = fs.getFileStatus(listFile);
+    Configuration conf = fs.getConf();
+
+    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+    FileStatus value = new FileStatus();
+    Text key = new Text();
+    final long targetsize = totalFileSize / numSplits;
+    long pos = 0L;
+    long last = 0L;
+    long acc = 0L;
+    long cbrem = srcst.getLen();
+    SequenceFile.Reader sl = null;
+
+    LOG.info("Average bytes per map: " + targetsize +
+        ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
+
+    try {
+      sl = new SequenceFile.Reader(conf, SequenceFile.Reader.file(listFile));
+      for (; sl.next(key, value); last = sl.getPosition()) {
+        // if adding this split would put this split past the target size,
+        // cut the last split and put this next file in the next split.
+        if (acc + value.getLen() > targetsize && acc != 0) {
+          long splitsize = last - pos;
+          FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
+          LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
+          splits.add(fileSplit);
+          cbrem -= splitsize;
+          pos = last;
+          acc = 0L;
+        }
+        acc += value.getLen();
+      }
+    }
+    finally {
+      IOUtils.closeStream(sl);
+    }
+    if (cbrem != 0) {
+      FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
+      LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
+      splits.add(fileSplit);
+    }
+
+    return splits;
+  }
+
+  private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
+    long lastEnd = 0;
+
+    //Verify if each split's start is matching with the previous end and
+    //we are not missing anything
+    for (InputSplit split : splits) {
+      FileSplit fileSplit = (FileSplit) split;
+      long start = fileSplit.getStart();
+      Assert.assertEquals(lastEnd, start);
+      lastEnd = start + fileSplit.getLength();
+    }
+
+    //Verify there is nothing more to read from the input file
+    SequenceFile.Reader reader
+            = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
+                    SequenceFile.Reader.file(listFile));
+
+    try {
+      reader.seek(lastEnd);
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+      Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+
+  private void checkAgainstLegacy(List<InputSplit> splits,
+                                  List<InputSplit> legacySplits)
+      throws IOException, InterruptedException {
+
+    Assert.assertEquals(legacySplits.size(), splits.size());
+    for (int index = 0; index < splits.size(); index++) {
+      FileSplit fileSplit = (FileSplit) splits.get(index);
+      FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
+      Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
+    }
+  }
+
+  @Test
+  public void testGetSplits() throws Exception {
+    testGetSplits(9);
+    for (int i=1; i<N_FILES; ++i)
+      testGetSplits(i);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDynamicInputFormat {
+  private static final Log LOG = LogFactory.getLog(TestDynamicInputFormat.class);
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 1000;
+  private static final int NUM_SPLITS = 7;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  private static List<String> expectedFilePaths = new ArrayList<String>(N_FILES);
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(getConfigurationForCluster())
+                  .numDataNodes(1).format(true).build();
+
+    for (int i=0; i<N_FILES; ++i)
+      createFile("/tmp/source/" + String.valueOf(i));
+
+  }
+
+  private static Configuration getConfigurationForCluster() {
+    Configuration configuration = new Configuration();
+    System.setProperty("test.build.data",
+                       "target/tmp/build/TEST_DYNAMIC_INPUT_FORMAT/data");
+    configuration.set("hadoop.log.dir", "target/tmp");
+    LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
+    LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+    return configuration;
+  }
+
+  private static DistCpOptions getOptions() throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+            + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+            + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    DistCpOptions options = new DistCpOptions(sourceList, targetPath);
+    options.setMaxMaps(NUM_SPLITS);
+    return options;
+  }
+
+  private static void createFile(String path) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      expectedFilePaths.add(fileSystem.listStatus(
+                                    new Path(path))[0].getPath().toString());
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testGetSplits() throws Exception {
+    DistCpOptions options = getOptions();
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks",
+                      String.valueOf(options.getMaxMaps()));
+    CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
+            new Path(cluster.getFileSystem().getUri().toString()
+                    +"/tmp/testDynInputFormat/fileList.seq"), options);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    DynamicInputFormat<Text, FileStatus> inputFormat =
+        new DynamicInputFormat<Text, FileStatus>();
+    List<InputSplit> splits = inputFormat.getSplits(jobContext);
+
+    int nFiles = 0;
+    int taskId = 0;
+
+    for (InputSplit split : splits) {
+      RecordReader<Text, FileStatus> recordReader =
+           inputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, taskId);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      
+      recordReader.initialize(splits.get(0), taskAttemptContext);
+      float previousProgressValue = 0f;
+      while (recordReader.nextKeyValue()) {
+        FileStatus fileStatus = recordReader.getCurrentValue();
+        String source = fileStatus.getPath().toString();
+        System.out.println(source);
+        Assert.assertTrue(expectedFilePaths.contains(source));
+        final float progress = recordReader.getProgress();
+        Assert.assertTrue(progress >= previousProgressValue);
+        Assert.assertTrue(progress >= 0.0f);
+        Assert.assertTrue(progress <= 1.0f);
+        previousProgressValue = progress;
+        ++nFiles;
+      }
+      Assert.assertTrue(recordReader.getProgress() == 1.0f);
+
+      ++taskId;
+    }
+
+    Assert.assertEquals(expectedFilePaths.size(), nFiles);
+  }
+
+  @Test
+  public void testGetSplitRatio() throws Exception {
+    Assert.assertEquals(1, DynamicInputFormat.getSplitRatio(1, 1000000000));
+    Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
+    Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
+    Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.Stack;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestDistCpUtils {
+  private static final Log LOG = LogFactory.getLog(TestDistCpUtils.class);
+
+  private static final Configuration config = new Configuration();
+  private static MiniDFSCluster cluster;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+                                                .build(); 
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetRelativePathRoot() {
+    Path root = new Path("/tmp/abc");
+    Path child = new Path("/tmp/abc/xyz/file");
+    Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/xyz/file");
+
+    root = new Path("/");
+    child = new Path("/a");
+    Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/a");
+  }
+
+  @Test
+  public void testPackAttributes() {
+    EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "");
+
+    attributes.add(FileAttribute.REPLICATION);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "R");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("R"));
+
+    attributes.add(FileAttribute.BLOCKSIZE);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RB");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RB"));
+
+    attributes.add(FileAttribute.USER);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBU");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBU"));
+
+    attributes.add(FileAttribute.GROUP);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUG");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUG"));
+
+    attributes.add(FileAttribute.PERMISSION);
+    Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUGP");
+    Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUGP"));
+  }
+
+  @Test
+  public void testPreserve() {
+    try {
+      FileSystem fs = FileSystem.get(config);
+      EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+
+
+      Path path = new Path("/tmp/abc");
+      Path src = new Path("/tmp/src");
+      fs.mkdirs(path);
+      fs.mkdirs(src);
+      FileStatus srcStatus = fs.getFileStatus(src);
+
+      FsPermission noPerm = new FsPermission((short) 0);
+      fs.setPermission(path, noPerm);
+      fs.setOwner(path, "nobody", "nobody");
+
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      FileStatus target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), noPerm);
+      Assert.assertEquals(target.getOwner(), "nobody");
+      Assert.assertEquals(target.getGroup(), "nobody");
+
+      attributes.add(FileAttribute.PERMISSION);
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+      Assert.assertEquals(target.getOwner(), "nobody");
+      Assert.assertEquals(target.getGroup(), "nobody");
+
+      attributes.add(FileAttribute.GROUP);
+      attributes.add(FileAttribute.USER);
+      DistCpUtils.preserve(fs, path, srcStatus, attributes);
+      target = fs.getFileStatus(path);
+      Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+      Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
+      Assert.assertEquals(target.getGroup(), srcStatus.getGroup());
+
+      fs.delete(path, true);
+      fs.delete(src, true);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Preserve test failure");
+    }
+  }
+
+  private static Random rand = new Random();
+
+  public static String createTestSetup(FileSystem fs) throws IOException {
+    return createTestSetup("/tmp1", fs, FsPermission.getDefault());
+  }
+  
+  public static String createTestSetup(FileSystem fs,
+                                       FsPermission perm) throws IOException {
+    return createTestSetup("/tmp1", fs, perm);
+  }
+
+  public static String createTestSetup(String baseDir,
+                                       FileSystem fs,
+                                       FsPermission perm) throws IOException {
+    String base = getBase(baseDir);
+    fs.mkdirs(new Path(base + "/newTest/hello/world1"));
+    fs.mkdirs(new Path(base + "/newTest/hello/world2/newworld"));
+    fs.mkdirs(new Path(base + "/newTest/hello/world3/oldworld"));
+    fs.setPermission(new Path(base + "/newTest"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world1"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world2"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world2/newworld"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world3"), perm);
+    fs.setPermission(new Path(base + "/newTest/hello/world3/oldworld"), perm);
+    createFile(fs, base + "/newTest/1");
+    createFile(fs, base + "/newTest/hello/2");
+    createFile(fs, base + "/newTest/hello/world3/oldworld/3");
+    createFile(fs, base + "/newTest/hello/world2/4");
+    return base;
+  }
+
+  private static String getBase(String base) {
+    String location = String.valueOf(rand.nextLong());
+    return base + "/" + location;
+  }
+
+  public static void delete(FileSystem fs, String path) {
+    try {
+      if (fs != null) {
+        if (path != null) {
+          fs.delete(new Path(path), true);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception encountered ", e);
+    }
+  }
+
+  public static void createFile(FileSystem fs, String filePath) throws IOException {
+    OutputStream out = fs.create(new Path(filePath));
+    IOUtils.closeStream(out);
+  }
+
+  public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
+      throws IOException {
+    Path base = new Path(targetBase);
+
+     Stack<Path> stack = new Stack<Path>();
+     stack.push(base);
+     while (!stack.isEmpty()) {
+       Path file = stack.pop();
+       if (!fs.exists(file)) continue;
+       FileStatus[] fStatus = fs.listStatus(file);
+       if (fStatus == null || fStatus.length == 0) continue;
+
+       for (FileStatus status : fStatus) {
+         if (status.isDirectory()) {
+           stack.push(status.getPath());
+         }
+         Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
+             DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
+       }
+     }
+     return true;
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestRetriableCommand {
+
+  private static class MyRetriableCommand extends RetriableCommand {
+
+    private int succeedAfter;
+    private int retryCount = 0;
+
+    public MyRetriableCommand(int succeedAfter) {
+      super("MyRetriableCommand");
+      this.succeedAfter = succeedAfter;
+    }
+
+    public MyRetriableCommand(int succeedAfter, RetryPolicy retryPolicy) {
+      super("MyRetriableCommand", retryPolicy);
+      this.succeedAfter = succeedAfter;
+    }
+
+    @Override
+    protected Object doExecute(Object... arguments) throws Exception {
+      if (++retryCount < succeedAfter)
+        throw new Exception("Transient failure#" + retryCount);
+      return 0;
+    }
+  }
+
+  @Test
+  public void testRetriableCommand() {
+    try {
+      new MyRetriableCommand(5).execute(0);
+      Assert.assertTrue(false);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+
+
+    try {
+      new MyRetriableCommand(3).execute(0);
+      Assert.assertTrue(true);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(false);
+    }
+
+    try {
+      new MyRetriableCommand(5, RetryPolicies.
+          retryUpToMaximumCountWithFixedSleep(5, 0, TimeUnit.MILLISECONDS)).execute(0);
+      Assert.assertTrue(true);
+    }
+    catch (Exception e) {
+      Assert.assertTrue(false);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+
+public class TestThrottledInputStream {
+  private static final Log LOG = LogFactory.getLog(TestThrottledInputStream.class);
+  private static final int BUFF_SIZE = 1024;
+
+  private enum CB {ONE_C, BUFFER, BUFF_OFFSET}
+
+  @Test
+  public void testRead() {
+    File tmpFile;
+    File outFile;
+    try {
+      tmpFile = createFile(1024);
+      outFile = createFile();
+
+      tmpFile.deleteOnExit();
+      outFile.deleteOnExit();
+
+      long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFFER);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFFER);
+*/
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFF_OFFSET);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFF_OFFSET);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFF_OFFSET);
+*/
+
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.ONE_C);
+/*
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.ONE_C);
+      copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.ONE_C);
+*/
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+    }
+  }
+
+  private long copyAndAssert(File tmpFile, File outFile,
+                             long maxBandwidth, float factor,
+                             int sleepTime, CB flag) throws IOException {
+    long bandwidth;
+    ThrottledInputStream in;
+    long maxBPS = (long) (maxBandwidth / factor);
+
+    if (maxBandwidth == 0) {
+      in = new ThrottledInputStream(new FileInputStream(tmpFile));
+    } else {
+      in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+    }
+    OutputStream out = new FileOutputStream(outFile);
+    try {
+      if (flag == CB.BUFFER) {
+        copyBytes(in, out, BUFF_SIZE);
+      } else if (flag == CB.BUFF_OFFSET){
+        copyBytesWithOffset(in, out, BUFF_SIZE);
+      } else {
+        copyByteByByte(in, out);
+      }
+
+      LOG.info(in);
+      bandwidth = in.getBytesPerSec();
+      Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+      Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
+      Assert.assertTrue(in.getTotalSleepTime() >  sleepTime || in.getBytesPerSec() <= maxBPS);
+    } finally {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+    }
+    return bandwidth;
+  }
+
+  private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize)
+    throws IOException {
+
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf, 0, buffSize);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      bytesRead = in.read(buf);
+    }
+  }
+
+  private static void copyByteByByte(InputStream in, OutputStream out)
+    throws IOException {
+
+    int ch = in.read();
+    while (ch >= 0) {
+      out.write(ch);
+      ch = in.read();
+    }
+  }
+
+  private static void copyBytes(InputStream in, OutputStream out, int buffSize)
+    throws IOException {
+
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      bytesRead = in.read(buf);
+    }
+  }
+
+  private File createFile(long sizeInKB) throws IOException {
+    File tmpFile = createFile();
+    writeToFile(tmpFile, sizeInKB);
+    return tmpFile;
+  }
+
+  private File createFile() throws IOException {
+    return File.createTempFile("tmp", "dat");
+  }
+
+  private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
+    OutputStream out = new FileOutputStream(tmpFile);
+    try {
+      byte[] buffer = new byte [1024];
+      for (long index = 0; index < sizeInKB; index++) {
+        out.write(buffer);
+      }
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+  <name>ssl.client.truststore.location</name>
+  <value>/path/to/truststore/keys/keystore.jks</value>
+  <description>Truststore to be used by clients like distcp. Must be
+  specified.
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.truststore.password</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.truststore.type</name>
+  <value>jks</value>
+  <description>Optional. Default value is "jks".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.location</name>
+  <value>/path/to/keystore/keys/keystore.jks</value>
+  <description>Keystore to be used by clients like distcp. Must be
+  specified.
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.password</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.keypassword</name>
+  <value>changeit</value>
+  <description>Optional. Default value is "".
+  </description>
+</property>
+
+<property>
+  <name>ssl.client.keystore.type</name>
+  <value>jks</value>
+  <description>Optional. Default value is "jks".
+  </description>
+</property>
+
+</configuration>

Modified: hadoop/common/trunk/hadoop-tools/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/pom.xml?rev=1236045&r1=1236044&r2=1236045&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/pom.xml (original)
+++ hadoop/common/trunk/hadoop-tools/pom.xml Thu Jan 26 06:36:52 2012
@@ -29,6 +29,7 @@
 
   <modules>
     <module>hadoop-streaming</module>
+    <module>hadoop-distcp</module>
     <module>hadoop-archives</module>
     <module>hadoop-rumen</module>
     <module>hadoop-tools-dist</module>