You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:50:03 UTC

svn commit: r1236049 [4/6] - in /hadoop/common/branches/branch-0.23: hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/ hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/h...

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+public class StubContext {
+
+  private StubStatusReporter reporter = new StubStatusReporter();
+  private RecordReader<Text, FileStatus> reader;
+  private StubInMemoryWriter writer = new StubInMemoryWriter();
+  private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+
+  public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
+                     int taskId) throws IOException, InterruptedException {
+
+    WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
+            = new WrappedMapper<Text, FileStatus, Text, Text>();
+
+    MapContextImpl<Text, FileStatus, Text, Text> contextImpl
+            = new MapContextImpl<Text, FileStatus, Text, Text>(conf,
+            getTaskAttemptID(taskId), reader, writer,
+            null, reporter, null);
+
+    this.reader = reader;
+    this.mapperContext = wrappedMapper.getMapContext(contextImpl);
+  }
+
+  public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+    return mapperContext;
+  }
+
+  public StatusReporter getReporter() {
+    return reporter;
+  }
+
+  public RecordReader<Text, FileStatus> getReader() {
+    return reader;
+  }
+
+  public StubInMemoryWriter getWriter() {
+    return writer;
+  }
+
+  public static class StubStatusReporter extends StatusReporter {
+
+    private Counters counters = new Counters();
+
+    public StubStatusReporter() {
+	    /*
+      final CounterGroup counterGroup
+              = new CounterGroup("FileInputFormatCounters",
+                                 "FileInputFormatCounters");
+      counterGroup.addCounter(new Counter("BYTES_READ",
+                                          "BYTES_READ",
+                                          0));
+      counters.addGroup(counterGroup);
+      */
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return counters.findCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return counters.findCounter(group, name);
+    }
+
+    @Override
+    public void progress() {}
+
+    @Override
+    public float getProgress() {
+      return 0F;
+    }
+
+    @Override
+    public void setStatus(String status) {}
+  }
+
+
+  public static class StubInMemoryWriter extends RecordWriter<Text, Text> {
+
+    List<Text> keys = new ArrayList<Text>();
+
+    List<Text> values = new ArrayList<Text>();
+
+    @Override
+    public void write(Text key, Text value) throws IOException, InterruptedException {
+      keys.add(key);
+      values.add(value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    }
+
+    public List<Text> keys() {
+      return keys;
+    }
+
+    public List<Text> values() {
+      return values;
+    }
+
+  }
+
+  public static TaskAttemptID getTaskAttemptID(int taskId) {
+    return new TaskAttemptID("", 0, TaskType.MAP, taskId, 0);
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestCopyListing extends SimpleCopyListing {
+  private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  private static final Configuration config = new Configuration();
+  private static MiniDFSCluster cluster;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+                                                .build();
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  public TestCopyListing() {
+    super(config, CREDENTIALS);
+  }
+
+  protected TestCopyListing(Configuration configuration) {
+    super(configuration, CREDENTIALS);
+  }
+
+  @Override
+  protected long getBytesToCopy() {
+    return 0;
+  }
+
+  @Override
+  protected long getNumberOfPaths() {
+    return 0;
+  }
+
+  @Test
+  public void testMultipleSrcToFile() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(new Path("/tmp/in/1"));
+      srcPaths.add(new Path("/tmp/in/2"));
+      Path target = new Path("/tmp/out/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/2");
+      fs.mkdirs(target);
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      validatePaths(options);
+      TestDistCpUtils.delete(fs, "/tmp");
+      //No errors
+
+      target = new Path("/tmp/out/1");
+      fs.create(target).close();
+      options = new DistCpOptions(srcPaths, target);
+      try {
+        validatePaths(options);
+        Assert.fail("Invalid inputs accepted");
+      } catch (InvalidInputException ignore) { }
+      TestDistCpUtils.delete(fs, "/tmp");
+
+      srcPaths.clear();
+      srcPaths.add(new Path("/tmp/in/1"));
+      fs.mkdirs(new Path("/tmp/in/1"));
+      target = new Path("/tmp/out/1");
+      fs.create(target).close();
+      options = new DistCpOptions(srcPaths, target);
+      try {
+        validatePaths(options);
+        Assert.fail("Invalid inputs accepted");
+      } catch (InvalidInputException ignore) { }
+      TestDistCpUtils.delete(fs, "/tmp");
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test input validation failed");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testDuplicates() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(new Path("/tmp/in/*/*"));
+      TestDistCpUtils.createFile(fs, "/tmp/in/1.txt");
+      TestDistCpUtils.createFile(fs, "/tmp/in/src/1.txt");
+      Path target = new Path("/tmp/out");
+      Path listingFile = new Path("/tmp/list");
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS, options);
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Duplicates not detected");
+      } catch (DuplicateFileException ignore) {
+      }
+    } catch (IOException e) {
+      LOG.error("Exception encountered in test", e);
+      Assert.fail("Test failed " + e.getMessage());
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testBuildListing() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      Path p1 = new Path("/tmp/in/1");
+      Path p2 = new Path("/tmp/in/2");
+      Path p3 = new Path("/tmp/in2/2");
+      Path target = new Path("/tmp/out/1");
+      srcPaths.add(p1.getParent());
+      srcPaths.add(p3.getParent());
+      TestDistCpUtils.createFile(fs, "/tmp/in/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/2");
+      TestDistCpUtils.createFile(fs, "/tmp/in2/2");
+      fs.mkdirs(target);
+      OutputStream out = fs.create(p1);
+      out.write("ABC".getBytes());
+      out.close();
+
+      out = fs.create(p2);
+      out.write("DEF".getBytes());
+      out.close();
+
+      out = fs.create(p3);
+      out.write("GHIJ".getBytes());
+      out.close();
+
+      Path listingFile = new Path("/tmp/file");
+
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      options.setSyncFolder(true);
+      CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Duplicates not detected");
+      } catch (DuplicateFileException ignore) {
+      }
+      Assert.assertEquals(listing.getBytesToCopy(), 10);
+      Assert.assertEquals(listing.getNumberOfPaths(), 3);
+      TestDistCpUtils.delete(fs, "/tmp");
+
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Invalid input not detected");
+      } catch (InvalidInputException ignore) {
+      }
+      TestDistCpUtils.delete(fs, "/tmp");
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test build listing failed");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testBuildListingForSingleFile() {
+    FileSystem fs = null;
+    String testRootString = "/singleFileListing";
+    Path testRoot = new Path(testRootString);
+    SequenceFile.Reader reader = null;
+    try {
+      fs = FileSystem.get(getConf());
+      if (fs.exists(testRoot))
+        TestDistCpUtils.delete(fs, testRootString);
+
+      Path sourceFile = new Path(testRoot, "/source/foo/bar/source.txt");
+      Path decoyFile  = new Path(testRoot, "/target/moo/source.txt");
+      Path targetFile = new Path(testRoot, "/target/moo/target.txt");
+
+      TestDistCpUtils.createFile(fs, sourceFile.toString());
+      TestDistCpUtils.createFile(fs, decoyFile.toString());
+      TestDistCpUtils.createFile(fs, targetFile.toString());
+
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(sourceFile);
+
+      DistCpOptions options = new DistCpOptions(srcPaths, targetFile);
+      CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+      final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
+      listing.buildListing(listFile, options);
+
+      reader = new SequenceFile.Reader(fs, listFile, getConf());
+      FileStatus fileStatus = new FileStatus();
+      Text relativePath = new Text();
+      Assert.assertTrue(reader.next(relativePath, fileStatus));
+      Assert.assertTrue(relativePath.toString().equals(""));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception encountered.");
+      LOG.error("Unexpected exception: ", e);
+    }
+    finally {
+      TestDistCpUtils.delete(fs, testRootString);
+      IOUtils.closeStream(reader);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.tools.mapred.CopyOutputFormat;
+import org.junit.*;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.*;
+
+@Ignore
+public class TestDistCp {
+  private static final Log LOG = LogFactory.getLog(TestDistCp.class);
+  private static List<Path> pathList = new ArrayList<Path>();
+  private static final int FILE_SIZE = 1024;
+
+  private static Configuration configuration;
+  private static MiniDFSCluster cluster;
+  private static MiniMRCluster mrCluster;
+
+  private static final String SOURCE_PATH = "/tmp/source";
+  private static final String TARGET_PATH = "/tmp/target";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    configuration = getConfigurationForCluster();
+    cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(1)
+                    .format(true).build();
+    System.setProperty("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
+    configuration.set("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
+    System.setProperty("hadoop.log.dir", "target/tmp");
+    configuration.set("hadoop.log.dir", "target/tmp");
+    mrCluster = new MiniMRCluster(1, cluster.getFileSystem().getUri().toString(), 1);
+    Configuration mrConf = mrCluster.createJobConf();
+    final String mrJobTracker = mrConf.get("mapred.job.tracker");
+    configuration.set("mapred.job.tracker", mrJobTracker);
+    final String mrJobTrackerAddress
+            = mrConf.get("mapred.job.tracker.http.address");
+    configuration.set("mapred.job.tracker.http.address", mrJobTrackerAddress);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    if (mrCluster != null) mrCluster.shutdown();
+    if (cluster != null) cluster.shutdown();
+  }
+
+  private static Configuration getConfigurationForCluster() throws IOException {
+    Configuration configuration = new Configuration();
+    System.setProperty("test.build.data", "target/build/TEST_DISTCP/data");
+    configuration.set("hadoop.log.dir", "target/tmp");
+
+    LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
+    LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+    return configuration;
+  }
+
+  private static void createSourceData() throws Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6");
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9");
+  }
+
+  private static void mkdirs(String path) throws Exception {
+    FileSystem fileSystem = cluster.getFileSystem();
+    final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+                                  fileSystem.getWorkingDirectory());
+    pathList.add(qualifiedPath);
+    fileSystem.mkdirs(qualifiedPath);
+  }
+
+  private static void touchFile(String path) throws Exception {
+    FileSystem fs;
+    DataOutputStream outputStream = null;
+    try {
+      fs = cluster.getFileSystem();
+      final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+                                            fs.getWorkingDirectory());
+      final long blockSize = fs.getDefaultBlockSize() * 2;
+      outputStream = fs.create(qualifiedPath, true, 0,
+              (short)(fs.getDefaultReplication()*2),
+              blockSize);
+      outputStream.write(new byte[FILE_SIZE]);
+      pathList.add(qualifiedPath);
+    }
+    finally {
+      IOUtils.cleanup(null, outputStream);
+    }
+  }
+
+  private static void clearState() throws Exception {
+    pathList.clear();
+    cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+    createSourceData();
+  }
+
+//  @Test
+  public void testUniformSizeDistCp() throws Exception {
+    try {
+      clearState();
+      final FileSystem fileSystem = cluster.getFileSystem();
+      Path sourcePath = new Path(SOURCE_PATH)
+              .makeQualified(fileSystem.getUri(),
+                             fileSystem.getWorkingDirectory());
+      List<Path> sources = new ArrayList<Path>();
+      sources.add(sourcePath);
+
+      Path targetPath = new Path(TARGET_PATH)
+              .makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+      DistCpOptions options = new DistCpOptions(sources, targetPath);
+      options.setAtomicCommit(true);
+      options.setBlocking(false);
+      Job job = new DistCp(configuration, options).execute();
+      Path workDir = CopyOutputFormat.getWorkingDirectory(job);
+      Path finalDir = CopyOutputFormat.getCommitDirectory(job);
+
+      while (!job.isComplete()) {
+        if (cluster.getFileSystem().exists(workDir)) {
+          break;
+        }
+      }
+      job.waitForCompletion(true);
+      Assert.assertFalse(cluster.getFileSystem().exists(workDir));
+      Assert.assertTrue(cluster.getFileSystem().exists(finalDir));
+      Assert.assertFalse(cluster.getFileSystem().exists(
+          new Path(job.getConfiguration().get(DistCpConstants.CONF_LABEL_META_FOLDER))));
+      verifyResults();
+    }
+    catch (Exception e) {
+      LOG.error("Exception encountered", e);
+      Assert.fail("Unexpected exception: " + e.getMessage());
+    }
+  }
+
+//  @Test
+  public void testCleanup() {
+    try {
+      clearState();
+      Path sourcePath = new Path("noscheme:///file");
+      List<Path> sources = new ArrayList<Path>();
+      sources.add(sourcePath);
+
+      final FileSystem fs = cluster.getFileSystem();
+      Path targetPath = new Path(TARGET_PATH)
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+      DistCpOptions options = new DistCpOptions(sources, targetPath);
+
+      Path stagingDir = JobSubmissionFiles.getStagingDir(
+              new Cluster(configuration), configuration);
+      stagingDir.getFileSystem(configuration).mkdirs(stagingDir);
+
+      try {
+        new DistCp(configuration, options).execute();
+      } catch (Throwable t) {
+        Assert.assertEquals(stagingDir.getFileSystem(configuration).
+            listStatus(stagingDir).length, 0);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("testCleanup failed " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRootPath() throws Exception {
+    try {
+      clearState();
+      List<Path> sources = new ArrayList<Path>();
+      final FileSystem fs = cluster.getFileSystem();
+      sources.add(new Path("/a")
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+      sources.add(new Path("/b")
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+      touchFile("/a/a.txt");
+      touchFile("/b/b.txt");
+
+      Path targetPath = new Path("/c")
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+      DistCpOptions options = new DistCpOptions(sources, targetPath);
+      new DistCp(configuration, options).execute();
+      Assert.assertTrue(fs.exists(new Path("/c/a/a.txt")));
+      Assert.assertTrue(fs.exists(new Path("/c/b/b.txt")));
+    }
+    catch (Exception e) {
+      LOG.error("Exception encountered", e);
+      Assert.fail("Unexpected exception: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDynamicDistCp() throws Exception {
+    try {
+      clearState();
+      final FileSystem fs = cluster.getFileSystem();
+      Path sourcePath = new Path(SOURCE_PATH)
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+      List<Path> sources = new ArrayList<Path>();
+      sources.add(sourcePath);
+
+      Path targetPath = new Path(TARGET_PATH)
+              .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+      DistCpOptions options = new DistCpOptions(sources, targetPath);
+      options.setCopyStrategy("dynamic");
+
+      options.setAtomicCommit(true);
+      options.setAtomicWorkPath(new Path("/work"));
+      options.setBlocking(false);
+      Job job = new DistCp(configuration, options).execute();
+      Path workDir = CopyOutputFormat.getWorkingDirectory(job);
+      Path finalDir = CopyOutputFormat.getCommitDirectory(job);
+
+      while (!job.isComplete()) {
+        if (fs.exists(workDir)) {
+          break;
+        }
+      }
+      job.waitForCompletion(true);
+      Assert.assertFalse(fs.exists(workDir));
+      Assert.assertTrue(fs.exists(finalDir));
+
+      verifyResults();
+    }
+    catch (Exception e) {
+      LOG.error("Exception encountered", e);
+      Assert.fail("Unexpected exception: " + e.getMessage());
+    }
+  }
+
+  private static void verifyResults() throws Exception {
+    for (Path path : pathList) {
+      FileSystem fs = cluster.getFileSystem();
+
+      Path sourcePath = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+      Path targetPath
+              = new Path(sourcePath.toString().replaceAll(SOURCE_PATH, TARGET_PATH));
+
+      Assert.assertTrue(fs.exists(targetPath));
+      Assert.assertEquals(fs.isFile(sourcePath), fs.isFile(targetPath));
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestFileBasedCopyListing {
+  private static final Log LOG = LogFactory.getLog(TestFileBasedCopyListing.class);
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  private static final Configuration config = new Configuration();
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+                                                .build();
+    fs = cluster.getFileSystem();
+    buildExpectedValuesMap();
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static Map<String, String> map = new HashMap<String, String>();
+
+  private static void buildExpectedValuesMap() {
+    map.put("/file1", "/tmp/singlefile1/file1");
+    map.put("/file2", "/tmp/singlefile2/file2");
+    map.put("/file3", "/tmp/multifile/file3");
+    map.put("/file4", "/tmp/multifile/file4");
+    map.put("/file5", "/tmp/multifile/file5");
+    map.put("/multifile/file3", "/tmp/multifile/file3");
+    map.put("/multifile/file4", "/tmp/multifile/file4");
+    map.put("/multifile/file5", "/tmp/multifile/file5");
+    map.put("/Ufile3", "/tmp/Umultifile/Ufile3");
+    map.put("/Ufile4", "/tmp/Umultifile/Ufile4");
+    map.put("/Ufile5", "/tmp/Umultifile/Ufile5");
+    map.put("/dir1", "/tmp/singledir/dir1");
+    map.put("/singledir/dir1", "/tmp/singledir/dir1");
+    map.put("/dir2", "/tmp/singledir/dir2");
+    map.put("/singledir/dir2", "/tmp/singledir/dir2");
+    map.put("/Udir1", "/tmp/Usingledir/Udir1");
+    map.put("/Udir2", "/tmp/Usingledir/Udir2");
+    map.put("/dir2/file6", "/tmp/singledir/dir2/file6");
+    map.put("/singledir/dir2/file6", "/tmp/singledir/dir2/file6");
+    map.put("/file7", "/tmp/singledir1/dir3/file7");
+    map.put("/file8", "/tmp/singledir1/dir3/file8");
+    map.put("/file9", "/tmp/singledir1/dir3/file9");
+    map.put("/dir3/file7", "/tmp/singledir1/dir3/file7");
+    map.put("/dir3/file8", "/tmp/singledir1/dir3/file8");
+    map.put("/dir3/file9", "/tmp/singledir1/dir3/file9");
+    map.put("/Ufile7", "/tmp/Usingledir1/Udir3/Ufile7");
+    map.put("/Ufile8", "/tmp/Usingledir1/Udir3/Ufile8");
+    map.put("/Ufile9", "/tmp/Usingledir1/Udir3/Ufile9");
+  }
+
+  @Test
+  public void testSingleFileMissingTarget() {
+    caseSingleFileMissingTarget(false);
+    caseSingleFileMissingTarget(true);
+  }
+
+  private void caseSingleFileMissingTarget(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/singlefile1/file1");
+      createFiles("/tmp/singlefile1/file1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 0);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testSingleFileTargetFile() {
+    caseSingleFileTargetFile(false);
+    caseSingleFileTargetFile(true);
+  }
+
+  private void caseSingleFileTargetFile(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/singlefile1/file1");
+      createFiles("/tmp/singlefile1/file1", target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 0);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testSingleFileTargetDir() {
+    caseSingleFileTargetDir(false);
+    caseSingleFileTargetDir(true);
+  }
+
+  private void caseSingleFileTargetDir(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/singlefile2/file2");
+      createFiles("/tmp/singlefile2/file2");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testSingleDirTargetMissing() {
+    caseSingleDirTargetMissing(false);
+    caseSingleDirTargetMissing(true);
+  }
+
+  private void caseSingleDirTargetMissing(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/singledir");
+      mkdirs("/tmp/singledir/dir1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testSingleDirTargetPresent() {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/singledir");
+      mkdirs("/tmp/singledir/dir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target);
+
+      checkResult(listFile, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testUpdateSingleDirTargetPresent() {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/Usingledir");
+      mkdirs("/tmp/Usingledir/Udir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, true);
+
+      checkResult(listFile, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testMultiFileTargetPresent() {
+    caseMultiFileTargetPresent(false);
+    caseMultiFileTargetPresent(true);
+  }
+
+  private void caseMultiFileTargetPresent(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 3);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testMultiFileTargetMissing() {
+    caseMultiFileTargetMissing(false);
+    caseMultiFileTargetMissing(true);
+  }
+
+  private void caseMultiFileTargetMissing(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 3);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testMultiDirTargetPresent() {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      mkdirs(target.toString(), "/tmp/singledir/dir1");
+
+      runTest(listFile, target);
+
+      checkResult(listFile, 4);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testUpdateMultiDirTargetPresent() {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/Umultifile", "/tmp/Usingledir");
+      createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+      mkdirs(target.toString(), "/tmp/Usingledir/Udir1");
+
+      runTest(listFile, target, true);
+
+      checkResult(listFile, 4);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testMultiDirTargetMissing() {
+    caseMultiDirTargetMissing(false);
+    caseMultiDirTargetMissing(true);
+  }
+
+  private void caseMultiDirTargetMissing(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      mkdirs("/tmp/singledir/dir1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 4);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testGlobTargetMissingSingleLevel() {
+    caseGlobTargetMissingSingleLevel(false);
+    caseGlobTargetMissingSingleLevel(true);
+  }
+
+  private void caseGlobTargetMissingSingleLevel(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp1/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/*");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      createFiles("/tmp/singledir/dir2/file6");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 5);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+      TestDistCpUtils.delete(fs, "/tmp1");
+    }
+  }
+
+  @Test
+  public void testGlobTargetMissingMultiLevel() {
+    caseGlobTargetMissingMultiLevel(false);
+    caseGlobTargetMissingMultiLevel(true);
+  }
+
+  private void caseGlobTargetMissingMultiLevel(boolean sync) {
+
+    try {
+      Path listFile = new Path("/tmp1/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/*/*");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+          "/tmp/singledir1/dir3/file9");
+
+      runTest(listFile, target, sync);
+
+      checkResult(listFile, 6);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+      TestDistCpUtils.delete(fs, "/tmp1");
+    }
+  }
+
+  @Test
+  public void testGlobTargetDirMultiLevel() {
+
+    try {
+      Path listFile = new Path("/tmp1/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/*/*");
+      createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+      createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+          "/tmp/singledir1/dir3/file9");
+      mkdirs(target.toString());
+
+      runTest(listFile, target);
+
+      checkResult(listFile, 6);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+      TestDistCpUtils.delete(fs, "/tmp1");
+    }
+  }
+
+  @Test
+  public void testUpdateGlobTargetDirMultiLevel() {
+
+    try {
+      Path listFile = new Path("/tmp1/listing");
+      Path target = new Path("/tmp/target");
+
+      addEntries(listFile, "/tmp/*/*");
+      createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+      createFiles("/tmp/Usingledir1/Udir3/Ufile7", "/tmp/Usingledir1/Udir3/Ufile8",
+          "/tmp/Usingledir1/Udir3/Ufile9");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, true);
+
+      checkResult(listFile, 6);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing build listing", e);
+      Assert.fail("build listing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+      TestDistCpUtils.delete(fs, "/tmp1");
+    }
+  }
+
+  private void addEntries(Path listFile, String... entries) throws IOException {
+    OutputStream out = fs.create(listFile);
+    try {
+      for (String entry : entries){
+        out.write(entry.getBytes());
+        out.write("\n".getBytes());
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+  private void createFiles(String... entries) throws IOException {
+    for (String entry : entries){
+      OutputStream out = fs.create(new Path(entry));
+      try {
+        out.write(entry.getBytes());
+        out.write("\n".getBytes());
+      } finally {
+        out.close();
+      }
+    }
+  }
+
+  private void mkdirs(String... entries) throws IOException {
+    for (String entry : entries){
+      fs.mkdirs(new Path(entry));
+    }
+  }
+
+  private void runTest(Path listFile, Path target) throws IOException {
+    runTest(listFile, target, true);
+  }
+
+  private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+    CopyListing listing = new FileBasedCopyListing(config, CREDENTIALS);
+    DistCpOptions options = new DistCpOptions(listFile, target);
+    options.setSyncFolder(sync);
+    listing.buildListing(listFile, options);
+  }
+
+  private void checkResult(Path listFile, int count) throws IOException {
+    if (count == 0) {
+      return;
+    }
+
+    int recCount = 0;
+    SequenceFile.Reader reader = new SequenceFile.Reader(config,
+                                            SequenceFile.Reader.file(listFile));
+    try {
+      Text relPath = new Text();
+      FileStatus fileStatus = new FileStatus();
+      while (reader.next(relPath, fileStatus)) {
+        Assert.assertEquals(fileStatus.getPath().toUri().getPath(), map.get(relPath.toString()));
+        recCount++;
+      }
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+    Assert.assertEquals(recCount, count);
+  }
+
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestGlobbedCopyListing {
+
+  private static MiniDFSCluster cluster;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  public static Map<String, String> expectedValues = new HashMap<String, String>();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    createSourceData();
+  }
+
+  private static void createSourceData() throws Exception {
+    mkdirs("/tmp/source/1");
+    mkdirs("/tmp/source/2");
+    mkdirs("/tmp/source/2/3");
+    mkdirs("/tmp/source/2/3/4");
+    mkdirs("/tmp/source/5");
+    touchFile("/tmp/source/5/6");
+    mkdirs("/tmp/source/7");
+    mkdirs("/tmp/source/7/8");
+    touchFile("/tmp/source/7/8/9");
+  }
+
+  private static void mkdirs(String path) throws Exception {
+    FileSystem fileSystem = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      fileSystem.mkdirs(new Path(path));
+      recordInExpectedValues(path);
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem);
+    }
+  }
+
+  private static void touchFile(String path) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      recordInExpectedValues(path);
+    }
+    finally {
+      IOUtils.cleanup(null, fileSystem, outputStream);
+    }
+  }
+
+  private static void recordInExpectedValues(String path) throws Exception {
+    FileSystem fileSystem = cluster.getFileSystem();
+    Path sourcePath = new Path(fileSystem.getUri().toString() + path);
+    expectedValues.put(sourcePath.toString(), DistCpUtils.getRelativePath(
+        new Path("/tmp/source"), sourcePath));
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    final URI uri = cluster.getFileSystem().getUri();
+    final String pathString = uri.toString();
+    Path fileSystemPath = new Path(pathString);
+    Path source = new Path(fileSystemPath.toString() + "/tmp/source");
+    Path target = new Path(fileSystemPath.toString() + "/tmp/target");
+    Path listingPath = new Path(fileSystemPath.toString() + "/tmp/META/fileList.seq");
+    DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+
+    new GlobbedCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
+
+    verifyContents(listingPath);
+  }
+
+  private void verifyContents(Path listingPath) throws Exception {
+    SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
+                                              listingPath, new Configuration());
+    Text key   = new Text();
+    FileStatus value = new FileStatus();
+    Map<String, String> actualValues = new HashMap<String, String>();
+    while (reader.next(key, value)) {
+      actualValues.put(value.getPath().toString(), key.toString());
+    }
+
+    Assert.assertEquals(expectedValues.size(), actualValues.size());
+    for (Map.Entry<String, String> entry : actualValues.entrySet()) {
+      Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestIntegration {
+  private static final Log LOG = LogFactory.getLog(TestIntegration.class);
+
+  private static FileSystem fs;
+
+  private static Path listFile;
+  private static Path target;
+  private static String root;
+
+  private static Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "file:///");
+    conf.set("mapred.job.tracker", "local");
+    return conf;
+  }
+
+  @BeforeClass
+  public static void setup() {
+    try {
+      fs = FileSystem.get(getConf());
+      listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      target = new Path("target/tmp/target").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      root = new Path("target/tmp").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory()).toString();
+      TestDistCpUtils.delete(fs, root);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+    }
+  }
+
+  @Test
+  public void testSingleFileMissingTarget() {
+    caseSingleFileMissingTarget(false);
+    caseSingleFileMissingTarget(true);
+  }
+
+  private void caseSingleFileMissingTarget(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile1/file1");
+      createFiles("singlefile1/file1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testSingleFileTargetFile() {
+    caseSingleFileTargetFile(false);
+    caseSingleFileTargetFile(true);
+  }
+
+  private void caseSingleFileTargetFile(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile1/file1");
+      createFiles("singlefile1/file1", target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testSingleFileTargetDir() {
+    caseSingleFileTargetDir(false);
+    caseSingleFileTargetDir(true);
+  }
+
+  private void caseSingleFileTargetDir(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile2/file2");
+      createFiles("singlefile2/file2");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1, "file2");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testSingleDirTargetMissing() {
+    caseSingleDirTargetMissing(false);
+    caseSingleDirTargetMissing(true);
+  }
+
+  private void caseSingleDirTargetMissing(boolean sync) {
+
+    try {
+      addEntries(listFile, "singledir");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1, "dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testSingleDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "singledir");
+      mkdirs(root + "/singledir/dir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 1, "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testUpdateSingleDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "Usingledir");
+      mkdirs(root + "/Usingledir/Udir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 1, "Udir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testMultiFileTargetPresent() {
+    caseMultiFileTargetPresent(false);
+    caseMultiFileTargetPresent(true);
+  }
+
+  private void caseMultiFileTargetPresent(boolean sync) {
+
+    try {
+      addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 3, "file3", "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testMultiFileTargetMissing() {
+    caseMultiFileTargetMissing(false);
+    caseMultiFileTargetMissing(true);
+  }
+
+  private void caseMultiFileTargetMissing(boolean sync) {
+
+    try {
+      addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 3, "file3", "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testMultiDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(target.toString(), root + "/singledir/dir1");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testUpdateMultiDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "Umultifile", "Usingledir");
+      createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
+      mkdirs(target.toString(), root + "/Usingledir/Udir1");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testMultiDirTargetMissing() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4",
+          "multifile/file5", "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testUpdateMultiDirTargetMissing() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "file3", "file4", "file5", "dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test
+  public void testGlobTargetMissingSingleLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+                                fs.getWorkingDirectory());
+      addEntries(listFile, "*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir/dir2/file6");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
+          "singledir/dir2/file6");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test
+  public void testUpdateGlobTargetMissingSingleLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+                                  fs.getWorkingDirectory());
+      addEntries(listFile, "*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir/dir2/file6");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test
+  public void testGlobTargetMissingMultiLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      addEntries(listFile, "*/*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+          "singledir1/dir3/file9");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 4, "file3", "file4", "file5",
+          "dir3/file7", "dir3/file8", "dir3/file9");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test
+  public void testUpdateGlobTargetMissingMultiLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      addEntries(listFile, "*/*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+          "singledir1/dir3/file9");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 6, "file3", "file4", "file5",
+          "file7", "file8", "file9");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  private void addEntries(Path listFile, String... entries) throws IOException {
+    OutputStream out = fs.create(listFile);
+    try {
+      for (String entry : entries){
+        out.write((root + "/" + entry).getBytes());
+        out.write("\n".getBytes());
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+  private void createFiles(String... entries) throws IOException {
+    for (String entry : entries){
+      OutputStream out = fs.create(new Path(root + "/" + entry));
+      try {
+        out.write((root + "/" + entry).getBytes());
+        out.write("\n".getBytes());
+      } finally {
+        out.close();
+      }
+    }
+  }
+
+  private void mkdirs(String... entries) throws IOException {
+    for (String entry : entries){
+      fs.mkdirs(new Path(entry));
+    }
+  }
+
+  private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+    DistCpOptions options = new DistCpOptions(listFile, target);
+    options.setSyncFolder(sync);
+    try {
+      new DistCp(getConf(), options).execute();
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      throw new IOException(e);
+    }
+  }
+
+  private void checkResult(Path target, int count, String... relPaths) throws IOException {
+    Assert.assertEquals(count, fs.listStatus(target).length);
+    if (relPaths == null || relPaths.length == 0) {
+      Assert.assertTrue(target.toString(), fs.exists(target));
+      return;
+    }
+    for (String relPath : relPaths) {
+      Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath)));
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions.*;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class TestOptionsParser {
+
+  @Test
+  public void testParseIgnoreFailure() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldIgnoreFailures());
+
+    options = OptionsParser.parse(new String[] {
+        "-i",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldIgnoreFailures());
+  }
+
+  @Test
+  public void testParseOverwrite() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldOverwrite());
+
+    options = OptionsParser.parse(new String[] {
+        "-overwrite",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldOverwrite());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-update",
+          "-overwrite",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Update and overwrite aren't allowed together");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testLogPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getLogPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-log",
+        "hdfs://localhost:8020/logs",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getLogPath(), new Path("hdfs://localhost:8020/logs"));
+  }
+
+  @Test
+  public void testParseBlokcing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldBlock());
+
+    options = OptionsParser.parse(new String[] {
+        "-async",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldBlock());
+  }
+
+  @Test
+  public void testParsebandwidth() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+    options = OptionsParser.parse(new String[] {
+        "-bandwidth",
+        "11",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMapBandwidth(), 11);
+  }
+
+  @Test
+  public void testParseSkipCRC() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldSkipCRC());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-skipcrccheck",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+    Assert.assertTrue(options.shouldSkipCRC());
+  }
+
+  @Test
+  public void testParseAtomicCommit() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldAtomicCommit());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldAtomicCommit());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-atomic",
+          "-update",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Atomic and sync folders were allowed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testParseWorkPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getAtomicWorkPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getAtomicWorkPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "-tmp",
+        "hdfs://localhost:8020/work",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getAtomicWorkPath(), new Path("hdfs://localhost:8020/work"));
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-tmp",
+          "hdfs://localhost:8020/work",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("work path was allowed without -atomic switch");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testParseSyncFolders() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldSyncFolder());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+  }
+
+  @Test
+  public void testParseDeleteMissing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldDeleteMissing());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-delete",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+    Assert.assertTrue(options.shouldDeleteMissing());
+
+    options = OptionsParser.parse(new String[] {
+        "-overwrite",
+        "-delete",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldOverwrite());
+    Assert.assertTrue(options.shouldDeleteMissing());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-atomic",
+          "-delete",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Atomic and delete folders were allowed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testParseSSLConf() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getSslConfigurationFile());
+
+    options = OptionsParser.parse(new String[] {
+        "-mapredSslConf",
+        "/tmp/ssl-client.xml",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml");
+  }
+
+  @Test
+  public void testParseMaps() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMaxMaps(), DistCpConstants.DEFAULT_MAPS);
+
+    options = OptionsParser.parse(new String[] {
+        "-m",
+        "1",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMaxMaps(), 1);
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-m",
+          "hello",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Non numberic map parsed");
+    } catch (IllegalArgumentException ignore) { }
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-mapredXslConf",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Non numberic map parsed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testSourceListing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getSourceFileListing(),
+        new Path("hdfs://localhost:8020/source/first"));
+  }
+
+  @Test
+  public void testSourceListingAndSourcePath() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-f",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Both source listing & source paths allowed");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testMissingSourceInfo() {
+    try {
+      OptionsParser.parse(new String[] {
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Neither source listing not source paths present");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testMissingTarget() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-f", "hdfs://localhost:8020/source"});
+      Assert.fail("Missing target allowed");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testInvalidArgs() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-m", "-f", "hdfs://localhost:8020/source"});
+      Assert.fail("Missing map value");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testToString() {
+    DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
+    String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
+        "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
+        "sourceFileListing=abc, sourcePaths=null, targetPath=xyz}";
+    Assert.assertEquals(val, option.toString());
+    Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
+        DistCpOptionSwitch.ATOMIC_COMMIT.name());
+  }
+
+  @Test
+  public void testCopyStrategy() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-strategy",
+        "dynamic",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getCopyStrategy(), "dynamic");
+
+    options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getCopyStrategy(), DistCpConstants.UNIFORMSIZE);
+  }
+
+  @Test
+  public void testTargetPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost:8020/target/"));
+  }
+
+  @Test
+  public void testPreserve() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-pbr",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-pbrgup",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    int i = 0;
+    Iterator<FileAttribute> attribIterator = options.preserveAttributes();
+    while (attribIterator.hasNext()) {
+      attribIterator.next();
+      i++;
+    }
+    Assert.assertEquals(i, 5);
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-pabc",
+          "-f",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target"});
+      Assert.fail("Invalid preserve attribute");
+    }
+    catch (IllegalArgumentException ignore) {}
+    catch (NoSuchElementException ignore) {}
+
+    options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    options.preserve(FileAttribute.PERMISSION);
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+
+    options.preserve(FileAttribute.PERMISSION);
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+  }
+
+  @Test
+  public void testOptionsSwitchAddToConf() {
+    Configuration conf = new Configuration();
+    Assert.assertNull(conf.get(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel()));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+  }
+
+  @Test
+  public void testOptionsAppendToConf() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "-i",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+        DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+    conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+    Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), null);
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-delete",
+        "-pu",
+        "-bandwidth",
+        "11",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+    Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
+    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+  }
+}