You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC
svn commit: r1236045 [4/5] - in /hadoop/common/trunk: hadoop-project/
hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/
hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.tools.mapred.CopyOutputFormat;
+import org.junit.*;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.*;
+
+@Ignore
+public class TestDistCp {
+ private static final Log LOG = LogFactory.getLog(TestDistCp.class);
+ private static List<Path> pathList = new ArrayList<Path>();
+ private static final int FILE_SIZE = 1024;
+
+ private static Configuration configuration;
+ private static MiniDFSCluster cluster;
+ private static MiniMRCluster mrCluster;
+
+ private static final String SOURCE_PATH = "/tmp/source";
+ private static final String TARGET_PATH = "/tmp/target";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ configuration = getConfigurationForCluster();
+ cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(1)
+ .format(true).build();
+ System.setProperty("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
+ configuration.set("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
+ System.setProperty("hadoop.log.dir", "target/tmp");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ mrCluster = new MiniMRCluster(1, cluster.getFileSystem().getUri().toString(), 1);
+ Configuration mrConf = mrCluster.createJobConf();
+ final String mrJobTracker = mrConf.get("mapred.job.tracker");
+ configuration.set("mapred.job.tracker", mrJobTracker);
+ final String mrJobTrackerAddress
+ = mrConf.get("mapred.job.tracker.http.address");
+ configuration.set("mapred.job.tracker.http.address", mrJobTrackerAddress);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ if (mrCluster != null) mrCluster.shutdown();
+ if (cluster != null) cluster.shutdown();
+ }
+
+ private static Configuration getConfigurationForCluster() throws IOException {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data", "target/build/TEST_DISTCP/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs(SOURCE_PATH + "/1");
+ mkdirs(SOURCE_PATH + "/2");
+ mkdirs(SOURCE_PATH + "/2/3/4");
+ mkdirs(SOURCE_PATH + "/2/3");
+ mkdirs(SOURCE_PATH + "/5");
+ touchFile(SOURCE_PATH + "/5/6");
+ mkdirs(SOURCE_PATH + "/7");
+ mkdirs(SOURCE_PATH + "/7/8");
+ touchFile(SOURCE_PATH + "/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+ fileSystem.getWorkingDirectory());
+ pathList.add(qualifiedPath);
+ fileSystem.mkdirs(qualifiedPath);
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fs;
+ DataOutputStream outputStream = null;
+ try {
+ fs = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ final long blockSize = fs.getDefaultBlockSize() * 2;
+ outputStream = fs.create(qualifiedPath, true, 0,
+ (short)(fs.getDefaultReplication()*2),
+ blockSize);
+ outputStream.write(new byte[FILE_SIZE]);
+ pathList.add(qualifiedPath);
+ }
+ finally {
+ IOUtils.cleanup(null, outputStream);
+ }
+ }
+
+ private static void clearState() throws Exception {
+ pathList.clear();
+ cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+ createSourceData();
+ }
+
+// @Test
+ public void testUniformSizeDistCp() throws Exception {
+ try {
+ clearState();
+ final FileSystem fileSystem = cluster.getFileSystem();
+ Path sourcePath = new Path(SOURCE_PATH)
+ .makeQualified(fileSystem.getUri(),
+ fileSystem.getWorkingDirectory());
+ List<Path> sources = new ArrayList<Path>();
+ sources.add(sourcePath);
+
+ Path targetPath = new Path(TARGET_PATH)
+ .makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+ DistCpOptions options = new DistCpOptions(sources, targetPath);
+ options.setAtomicCommit(true);
+ options.setBlocking(false);
+ Job job = new DistCp(configuration, options).execute();
+ Path workDir = CopyOutputFormat.getWorkingDirectory(job);
+ Path finalDir = CopyOutputFormat.getCommitDirectory(job);
+
+ while (!job.isComplete()) {
+ if (cluster.getFileSystem().exists(workDir)) {
+ break;
+ }
+ }
+ job.waitForCompletion(true);
+ Assert.assertFalse(cluster.getFileSystem().exists(workDir));
+ Assert.assertTrue(cluster.getFileSystem().exists(finalDir));
+ Assert.assertFalse(cluster.getFileSystem().exists(
+ new Path(job.getConfiguration().get(DistCpConstants.CONF_LABEL_META_FOLDER))));
+ verifyResults();
+ }
+ catch (Exception e) {
+ LOG.error("Exception encountered", e);
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+// @Test
+ public void testCleanup() {
+ try {
+ clearState();
+ Path sourcePath = new Path("noscheme:///file");
+ List<Path> sources = new ArrayList<Path>();
+ sources.add(sourcePath);
+
+ final FileSystem fs = cluster.getFileSystem();
+ Path targetPath = new Path(TARGET_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ DistCpOptions options = new DistCpOptions(sources, targetPath);
+
+ Path stagingDir = JobSubmissionFiles.getStagingDir(
+ new Cluster(configuration), configuration);
+ stagingDir.getFileSystem(configuration).mkdirs(stagingDir);
+
+ try {
+ new DistCp(configuration, options).execute();
+ } catch (Throwable t) {
+ Assert.assertEquals(stagingDir.getFileSystem(configuration).
+ listStatus(stagingDir).length, 0);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("testCleanup failed " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRootPath() throws Exception {
+ try {
+ clearState();
+ List<Path> sources = new ArrayList<Path>();
+ final FileSystem fs = cluster.getFileSystem();
+ sources.add(new Path("/a")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ sources.add(new Path("/b")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ touchFile("/a/a.txt");
+ touchFile("/b/b.txt");
+
+ Path targetPath = new Path("/c")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ DistCpOptions options = new DistCpOptions(sources, targetPath);
+ new DistCp(configuration, options).execute();
+ Assert.assertTrue(fs.exists(new Path("/c/a/a.txt")));
+ Assert.assertTrue(fs.exists(new Path("/c/b/b.txt")));
+ }
+ catch (Exception e) {
+ LOG.error("Exception encountered", e);
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDynamicDistCp() throws Exception {
+ try {
+ clearState();
+ final FileSystem fs = cluster.getFileSystem();
+ Path sourcePath = new Path(SOURCE_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ List<Path> sources = new ArrayList<Path>();
+ sources.add(sourcePath);
+
+ Path targetPath = new Path(TARGET_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ DistCpOptions options = new DistCpOptions(sources, targetPath);
+ options.setCopyStrategy("dynamic");
+
+ options.setAtomicCommit(true);
+ options.setAtomicWorkPath(new Path("/work"));
+ options.setBlocking(false);
+ Job job = new DistCp(configuration, options).execute();
+ Path workDir = CopyOutputFormat.getWorkingDirectory(job);
+ Path finalDir = CopyOutputFormat.getCommitDirectory(job);
+
+ while (!job.isComplete()) {
+ if (fs.exists(workDir)) {
+ break;
+ }
+ }
+ job.waitForCompletion(true);
+ Assert.assertFalse(fs.exists(workDir));
+ Assert.assertTrue(fs.exists(finalDir));
+
+ verifyResults();
+ }
+ catch (Exception e) {
+ LOG.error("Exception encountered", e);
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ private static void verifyResults() throws Exception {
+ for (Path path : pathList) {
+ FileSystem fs = cluster.getFileSystem();
+
+ Path sourcePath = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ Path targetPath
+ = new Path(sourcePath.toString().replaceAll(SOURCE_PATH, TARGET_PATH));
+
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertEquals(fs.isFile(sourcePath), fs.isFile(targetPath));
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestFileBasedCopyListing {
+ private static final Log LOG = LogFactory.getLog(TestFileBasedCopyListing.class);
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ fs = cluster.getFileSystem();
+ buildExpectedValuesMap();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private static Map<String, String> map = new HashMap<String, String>();
+
+ private static void buildExpectedValuesMap() {
+ map.put("/file1", "/tmp/singlefile1/file1");
+ map.put("/file2", "/tmp/singlefile2/file2");
+ map.put("/file3", "/tmp/multifile/file3");
+ map.put("/file4", "/tmp/multifile/file4");
+ map.put("/file5", "/tmp/multifile/file5");
+ map.put("/multifile/file3", "/tmp/multifile/file3");
+ map.put("/multifile/file4", "/tmp/multifile/file4");
+ map.put("/multifile/file5", "/tmp/multifile/file5");
+ map.put("/Ufile3", "/tmp/Umultifile/Ufile3");
+ map.put("/Ufile4", "/tmp/Umultifile/Ufile4");
+ map.put("/Ufile5", "/tmp/Umultifile/Ufile5");
+ map.put("/dir1", "/tmp/singledir/dir1");
+ map.put("/singledir/dir1", "/tmp/singledir/dir1");
+ map.put("/dir2", "/tmp/singledir/dir2");
+ map.put("/singledir/dir2", "/tmp/singledir/dir2");
+ map.put("/Udir1", "/tmp/Usingledir/Udir1");
+ map.put("/Udir2", "/tmp/Usingledir/Udir2");
+ map.put("/dir2/file6", "/tmp/singledir/dir2/file6");
+ map.put("/singledir/dir2/file6", "/tmp/singledir/dir2/file6");
+ map.put("/file7", "/tmp/singledir1/dir3/file7");
+ map.put("/file8", "/tmp/singledir1/dir3/file8");
+ map.put("/file9", "/tmp/singledir1/dir3/file9");
+ map.put("/dir3/file7", "/tmp/singledir1/dir3/file7");
+ map.put("/dir3/file8", "/tmp/singledir1/dir3/file8");
+ map.put("/dir3/file9", "/tmp/singledir1/dir3/file9");
+ map.put("/Ufile7", "/tmp/Usingledir1/Udir3/Ufile7");
+ map.put("/Ufile8", "/tmp/Usingledir1/Udir3/Ufile8");
+ map.put("/Ufile9", "/tmp/Usingledir1/Udir3/Ufile9");
+ }
+
+ @Test
+ public void testSingleFileMissingTarget() {
+ caseSingleFileMissingTarget(false);
+ caseSingleFileMissingTarget(true);
+ }
+
+ private void caseSingleFileMissingTarget(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile1/file1");
+ createFiles("/tmp/singlefile1/file1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 0);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetFile() {
+ caseSingleFileTargetFile(false);
+ caseSingleFileTargetFile(true);
+ }
+
+ private void caseSingleFileTargetFile(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile1/file1");
+ createFiles("/tmp/singlefile1/file1", target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 0);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetDir() {
+ caseSingleFileTargetDir(false);
+ caseSingleFileTargetDir(true);
+ }
+
+ private void caseSingleFileTargetDir(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile2/file2");
+ createFiles("/tmp/singlefile2/file2");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetMissing() {
+ caseSingleDirTargetMissing(false);
+ caseSingleDirTargetMissing(true);
+ }
+
+ private void caseSingleDirTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singledir");
+ mkdirs("/tmp/singledir/dir1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singledir");
+ mkdirs("/tmp/singledir/dir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testUpdateSingleDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/Usingledir");
+ mkdirs("/tmp/Usingledir/Udir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetPresent() {
+ caseMultiFileTargetPresent(false);
+ caseMultiFileTargetPresent(true);
+ }
+
+ private void caseMultiFileTargetPresent(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 3);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetMissing() {
+ caseMultiFileTargetMissing(false);
+ caseMultiFileTargetMissing(true);
+ }
+
+ private void caseMultiFileTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 3);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs(target.toString(), "/tmp/singledir/dir1");
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testUpdateMultiDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/Umultifile", "/tmp/Usingledir");
+ createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+ mkdirs(target.toString(), "/tmp/Usingledir/Udir1");
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetMissing() {
+ caseMultiDirTargetMissing(false);
+ caseMultiDirTargetMissing(true);
+ }
+
+ private void caseMultiDirTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs("/tmp/singledir/dir1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingSingleLevel() {
+ caseGlobTargetMissingSingleLevel(false);
+ caseGlobTargetMissingSingleLevel(true);
+ }
+
+ private void caseGlobTargetMissingSingleLevel(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir/dir2/file6");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 5);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingMultiLevel() {
+ caseGlobTargetMissingMultiLevel(false);
+ caseGlobTargetMissingMultiLevel(true);
+ }
+
+ private void caseGlobTargetMissingMultiLevel(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+ "/tmp/singledir1/dir3/file9");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testGlobTargetDirMultiLevel() {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+ "/tmp/singledir1/dir3/file9");
+ mkdirs(target.toString());
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testUpdateGlobTargetDirMultiLevel() {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+ createFiles("/tmp/Usingledir1/Udir3/Ufile7", "/tmp/Usingledir1/Udir3/Ufile8",
+ "/tmp/Usingledir1/Udir3/Ufile9");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ private void addEntries(Path listFile, String... entries) throws IOException {
+ OutputStream out = fs.create(listFile);
+ try {
+ for (String entry : entries){
+ out.write(entry.getBytes());
+ out.write("\n".getBytes());
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ private void createFiles(String... entries) throws IOException {
+ for (String entry : entries){
+ OutputStream out = fs.create(new Path(entry));
+ try {
+ out.write(entry.getBytes());
+ out.write("\n".getBytes());
+ } finally {
+ out.close();
+ }
+ }
+ }
+
+ private void mkdirs(String... entries) throws IOException {
+ for (String entry : entries){
+ fs.mkdirs(new Path(entry));
+ }
+ }
+
+ private void runTest(Path listFile, Path target) throws IOException {
+ runTest(listFile, target, true);
+ }
+
+ private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+ CopyListing listing = new FileBasedCopyListing(config, CREDENTIALS);
+ DistCpOptions options = new DistCpOptions(listFile, target);
+ options.setSyncFolder(sync);
+ listing.buildListing(listFile, options);
+ }
+
+ private void checkResult(Path listFile, int count) throws IOException {
+ if (count == 0) {
+ return;
+ }
+
+ int recCount = 0;
+ SequenceFile.Reader reader = new SequenceFile.Reader(config,
+ SequenceFile.Reader.file(listFile));
+ try {
+ Text relPath = new Text();
+ FileStatus fileStatus = new FileStatus();
+ while (reader.next(relPath, fileStatus)) {
+ Assert.assertEquals(fileStatus.getPath().toUri().getPath(), map.get(relPath.toString()));
+ recCount++;
+ }
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ Assert.assertEquals(recCount, count);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestGlobbedCopyListing {
+
+ private static MiniDFSCluster cluster;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ public static Map<String, String> expectedValues = new HashMap<String, String>();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+ createSourceData();
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs("/tmp/source/1");
+ mkdirs("/tmp/source/2");
+ mkdirs("/tmp/source/2/3");
+ mkdirs("/tmp/source/2/3/4");
+ mkdirs("/tmp/source/5");
+ touchFile("/tmp/source/5/6");
+ mkdirs("/tmp/source/7");
+ mkdirs("/tmp/source/7/8");
+ touchFile("/tmp/source/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ fileSystem.mkdirs(new Path(path));
+ recordInExpectedValues(path);
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem);
+ }
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ recordInExpectedValues(path);
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ private static void recordInExpectedValues(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ Path sourcePath = new Path(fileSystem.getUri().toString() + path);
+ expectedValues.put(sourcePath.toString(), DistCpUtils.getRelativePath(
+ new Path("/tmp/source"), sourcePath));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ final URI uri = cluster.getFileSystem().getUri();
+ final String pathString = uri.toString();
+ Path fileSystemPath = new Path(pathString);
+ Path source = new Path(fileSystemPath.toString() + "/tmp/source");
+ Path target = new Path(fileSystemPath.toString() + "/tmp/target");
+ Path listingPath = new Path(fileSystemPath.toString() + "/tmp/META/fileList.seq");
+ DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+
+ new GlobbedCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
+
+ verifyContents(listingPath);
+ }
+
+ private void verifyContents(Path listingPath) throws Exception {
+ SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
+ listingPath, new Configuration());
+ Text key = new Text();
+ FileStatus value = new FileStatus();
+ Map<String, String> actualValues = new HashMap<String, String>();
+ while (reader.next(key, value)) {
+ actualValues.put(value.getPath().toString(), key.toString());
+ }
+
+ Assert.assertEquals(expectedValues.size(), actualValues.size());
+ for (Map.Entry<String, String> entry : actualValues.entrySet()) {
+ Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestIntegration {
+ private static final Log LOG = LogFactory.getLog(TestIntegration.class);
+
+ private static FileSystem fs;
+
+ private static Path listFile;
+ private static Path target;
+ private static String root;
+
+ private static Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.job.tracker", "local");
+ return conf;
+ }
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ fs = FileSystem.get(getConf());
+ listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ target = new Path("target/tmp/target").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ root = new Path("target/tmp").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory()).toString();
+ TestDistCpUtils.delete(fs, root);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ }
+ }
+
+ @Test
+ public void testSingleFileMissingTarget() {
+ caseSingleFileMissingTarget(false);
+ caseSingleFileMissingTarget(true);
+ }
+
+ private void caseSingleFileMissingTarget(boolean sync) {
+
+ try {
+ addEntries(listFile, "singlefile1/file1");
+ createFiles("singlefile1/file1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetFile() {
+ caseSingleFileTargetFile(false);
+ caseSingleFileTargetFile(true);
+ }
+
+ private void caseSingleFileTargetFile(boolean sync) {
+
+ try {
+ addEntries(listFile, "singlefile1/file1");
+ createFiles("singlefile1/file1", target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetDir() {
+ caseSingleFileTargetDir(false);
+ caseSingleFileTargetDir(true);
+ }
+
+ private void caseSingleFileTargetDir(boolean sync) {
+
+ try {
+ addEntries(listFile, "singlefile2/file2");
+ createFiles("singlefile2/file2");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 1, "file2");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetMissing() {
+ caseSingleDirTargetMissing(false);
+ caseSingleDirTargetMissing(true);
+ }
+
+ private void caseSingleDirTargetMissing(boolean sync) {
+
+ try {
+ addEntries(listFile, "singledir");
+ mkdirs(root + "/singledir/dir1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 1, "dir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetPresent() {
+
+ try {
+ addEntries(listFile, "singledir");
+ mkdirs(root + "/singledir/dir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, false);
+
+ checkResult(target, 1, "singledir/dir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testUpdateSingleDirTargetPresent() {
+
+ try {
+ addEntries(listFile, "Usingledir");
+ mkdirs(root + "/Usingledir/Udir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, true);
+
+ checkResult(target, 1, "Udir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetPresent() {
+ caseMultiFileTargetPresent(false);
+ caseMultiFileTargetPresent(true);
+ }
+
+ private void caseMultiFileTargetPresent(boolean sync) {
+
+ try {
+ addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 3, "file3", "file4", "file5");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetMissing() {
+ caseMultiFileTargetMissing(false);
+ caseMultiFileTargetMissing(true);
+ }
+
+ private void caseMultiFileTargetMissing(boolean sync) {
+
+ try {
+ addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+
+ runTest(listFile, target, sync);
+
+ checkResult(target, 3, "file3", "file4", "file5");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetPresent() {
+
+ try {
+ addEntries(listFile, "multifile", "singledir");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ mkdirs(target.toString(), root + "/singledir/dir1");
+
+ runTest(listFile, target, false);
+
+ checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testUpdateMultiDirTargetPresent() {
+
+ try {
+ addEntries(listFile, "Umultifile", "Usingledir");
+ createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
+ mkdirs(target.toString(), root + "/Usingledir/Udir1");
+
+ runTest(listFile, target, true);
+
+ checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetMissing() {
+
+ try {
+ addEntries(listFile, "multifile", "singledir");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ mkdirs(root + "/singledir/dir1");
+
+ runTest(listFile, target, false);
+
+ checkResult(target, 2, "multifile/file3", "multifile/file4",
+ "multifile/file5", "singledir/dir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testUpdateMultiDirTargetMissing() {
+
+ try {
+ addEntries(listFile, "multifile", "singledir");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ mkdirs(root + "/singledir/dir1");
+
+ runTest(listFile, target, true);
+
+ checkResult(target, 4, "file3", "file4", "file5", "dir1");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingSingleLevel() {
+
+ try {
+ Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ addEntries(listFile, "*");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("singledir/dir2/file6");
+
+ runTest(listFile, target, false);
+
+ checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
+ "singledir/dir2/file6");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ TestDistCpUtils.delete(fs, "target/tmp1");
+ }
+ }
+
+ @Test
+ public void testUpdateGlobTargetMissingSingleLevel() {
+
+ try {
+ Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ addEntries(listFile, "*");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("singledir/dir2/file6");
+
+ runTest(listFile, target, true);
+
+ checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ TestDistCpUtils.delete(fs, "target/tmp1");
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingMultiLevel() {
+
+ try {
+ Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ addEntries(listFile, "*/*");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+ "singledir1/dir3/file9");
+
+ runTest(listFile, target, false);
+
+ checkResult(target, 4, "file3", "file4", "file5",
+ "dir3/file7", "dir3/file8", "dir3/file9");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ TestDistCpUtils.delete(fs, "target/tmp1");
+ }
+ }
+
+ @Test
+ public void testUpdateGlobTargetMissingMultiLevel() {
+
+ try {
+ Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ addEntries(listFile, "*/*");
+ createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+ createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+ "singledir1/dir3/file9");
+
+ runTest(listFile, target, true);
+
+ checkResult(target, 6, "file3", "file4", "file5",
+ "file7", "file8", "file9");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ TestDistCpUtils.delete(fs, "target/tmp1");
+ }
+ }
+
+ private void addEntries(Path listFile, String... entries) throws IOException {
+ OutputStream out = fs.create(listFile);
+ try {
+ for (String entry : entries){
+ out.write((root + "/" + entry).getBytes());
+ out.write("\n".getBytes());
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ private void createFiles(String... entries) throws IOException {
+ for (String entry : entries){
+ OutputStream out = fs.create(new Path(root + "/" + entry));
+ try {
+ out.write((root + "/" + entry).getBytes());
+ out.write("\n".getBytes());
+ } finally {
+ out.close();
+ }
+ }
+ }
+
+ private void mkdirs(String... entries) throws IOException {
+ for (String entry : entries){
+ fs.mkdirs(new Path(entry));
+ }
+ }
+
+ private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+ DistCpOptions options = new DistCpOptions(listFile, target);
+ options.setSyncFolder(sync);
+ try {
+ new DistCp(getConf(), options).execute();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new IOException(e);
+ }
+ }
+
+ private void checkResult(Path target, int count, String... relPaths) throws IOException {
+ Assert.assertEquals(count, fs.listStatus(target).length);
+ if (relPaths == null || relPaths.length == 0) {
+ Assert.assertTrue(target.toString(), fs.exists(target));
+ return;
+ }
+ for (String relPath : relPaths) {
+ Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath)));
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions.*;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class TestOptionsParser {
+
+ @Test
+ public void testParseIgnoreFailure() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldIgnoreFailures());
+
+ options = OptionsParser.parse(new String[] {
+ "-i",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldIgnoreFailures());
+ }
+
+ @Test
+ public void testParseOverwrite() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldOverwrite());
+
+ options = OptionsParser.parse(new String[] {
+ "-overwrite",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldOverwrite());
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-update",
+ "-overwrite",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Update and overwrite aren't allowed together");
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testLogPath() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertNull(options.getLogPath());
+
+ options = OptionsParser.parse(new String[] {
+ "-log",
+ "hdfs://localhost:8020/logs",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getLogPath(), new Path("hdfs://localhost:8020/logs"));
+ }
+
+ @Test
+ public void testParseBlokcing() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldBlock());
+
+ options = OptionsParser.parse(new String[] {
+ "-async",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldBlock());
+ }
+
+ @Test
+ public void testParsebandwidth() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+ options = OptionsParser.parse(new String[] {
+ "-bandwidth",
+ "11",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getMapBandwidth(), 11);
+ }
+
+ @Test
+ public void testParseSkipCRC() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldSkipCRC());
+
+ options = OptionsParser.parse(new String[] {
+ "-update",
+ "-skipcrccheck",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldSyncFolder());
+ Assert.assertTrue(options.shouldSkipCRC());
+ }
+
+ @Test
+ public void testParseAtomicCommit() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldAtomicCommit());
+
+ options = OptionsParser.parse(new String[] {
+ "-atomic",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldAtomicCommit());
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-atomic",
+ "-update",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Atomic and sync folders were allowed");
+ } catch (IllegalArgumentException ignore) { }
+ }
+
+ @Test
+ public void testParseWorkPath() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertNull(options.getAtomicWorkPath());
+
+ options = OptionsParser.parse(new String[] {
+ "-atomic",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertNull(options.getAtomicWorkPath());
+
+ options = OptionsParser.parse(new String[] {
+ "-atomic",
+ "-tmp",
+ "hdfs://localhost:8020/work",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getAtomicWorkPath(), new Path("hdfs://localhost:8020/work"));
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-tmp",
+ "hdfs://localhost:8020/work",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("work path was allowed without -atomic switch");
+ } catch (IllegalArgumentException ignore) {}
+ }
+
+ @Test
+ public void testParseSyncFolders() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldSyncFolder());
+
+ options = OptionsParser.parse(new String[] {
+ "-update",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldSyncFolder());
+ }
+
+ @Test
+ public void testParseDeleteMissing() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldDeleteMissing());
+
+ options = OptionsParser.parse(new String[] {
+ "-update",
+ "-delete",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldSyncFolder());
+ Assert.assertTrue(options.shouldDeleteMissing());
+
+ options = OptionsParser.parse(new String[] {
+ "-overwrite",
+ "-delete",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldOverwrite());
+ Assert.assertTrue(options.shouldDeleteMissing());
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-atomic",
+ "-delete",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Atomic and delete folders were allowed");
+ } catch (IllegalArgumentException ignore) { }
+ }
+
+ @Test
+ public void testParseSSLConf() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertNull(options.getSslConfigurationFile());
+
+ options = OptionsParser.parse(new String[] {
+ "-mapredSslConf",
+ "/tmp/ssl-client.xml",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml");
+ }
+
+ @Test
+ public void testParseMaps() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getMaxMaps(), DistCpConstants.DEFAULT_MAPS);
+
+ options = OptionsParser.parse(new String[] {
+ "-m",
+ "1",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getMaxMaps(), 1);
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-m",
+ "hello",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Non numberic map parsed");
+ } catch (IllegalArgumentException ignore) { }
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-mapredXslConf",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Non numberic map parsed");
+ } catch (IllegalArgumentException ignore) { }
+ }
+
+ @Test
+ public void testSourceListing() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getSourceFileListing(),
+ new Path("hdfs://localhost:8020/source/first"));
+ }
+
+ @Test
+ public void testSourceListingAndSourcePath() {
+ try {
+ OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Both source listing & source paths allowed");
+ } catch (IllegalArgumentException ignore) {}
+ }
+
+ @Test
+ public void testMissingSourceInfo() {
+ try {
+ OptionsParser.parse(new String[] {
+ "hdfs://localhost:8020/target/"});
+ Assert.fail("Neither source listing not source paths present");
+ } catch (IllegalArgumentException ignore) {}
+ }
+
+ @Test
+ public void testMissingTarget() {
+ try {
+ OptionsParser.parse(new String[] {
+ "-f", "hdfs://localhost:8020/source"});
+ Assert.fail("Missing target allowed");
+ } catch (IllegalArgumentException ignore) {}
+ }
+
+ @Test
+ public void testInvalidArgs() {
+ try {
+ OptionsParser.parse(new String[] {
+ "-m", "-f", "hdfs://localhost:8020/source"});
+ Assert.fail("Missing map value");
+ } catch (IllegalArgumentException ignore) {}
+ }
+
+ @Test
+ public void testToString() {
+ DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
+ String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
+ "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
+ "sourceFileListing=abc, sourcePaths=null, targetPath=xyz}";
+ Assert.assertEquals(val, option.toString());
+ Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
+ DistCpOptionSwitch.ATOMIC_COMMIT.name());
+ }
+
+ @Test
+ public void testCopyStrategy() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "-strategy",
+ "dynamic",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getCopyStrategy(), "dynamic");
+
+ options = OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getCopyStrategy(), DistCpConstants.UNIFORMSIZE);
+ }
+
+ @Test
+ public void testTargetPath() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost:8020/target/"));
+ }
+
+ @Test
+ public void testPreserve() {
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+ options = OptionsParser.parse(new String[] {
+ "-p",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+ options = OptionsParser.parse(new String[] {
+ "-p",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+ options = OptionsParser.parse(new String[] {
+ "-pbr",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+ options = OptionsParser.parse(new String[] {
+ "-pbrgup",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+ options = OptionsParser.parse(new String[] {
+ "-p",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ int i = 0;
+ Iterator<FileAttribute> attribIterator = options.preserveAttributes();
+ while (attribIterator.hasNext()) {
+ attribIterator.next();
+ i++;
+ }
+ Assert.assertEquals(i, 5);
+
+ try {
+ OptionsParser.parse(new String[] {
+ "-pabc",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target"});
+ Assert.fail("Invalid preserve attribute");
+ }
+ catch (IllegalArgumentException ignore) {}
+ catch (NoSuchElementException ignore) {}
+
+ options = OptionsParser.parse(new String[] {
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+ options.preserve(FileAttribute.PERMISSION);
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+
+ options.preserve(FileAttribute.PERMISSION);
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+ }
+
+ @Test
+ public void testOptionsSwitchAddToConf() {
+ Configuration conf = new Configuration();
+ Assert.assertNull(conf.get(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel()));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT);
+ Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+ }
+
+ @Test
+ public void testOptionsAppendToConf() {
+ Configuration conf = new Configuration();
+ Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+ Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+ DistCpOptions options = OptionsParser.parse(new String[] {
+ "-atomic",
+ "-i",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ options.appendToConf(conf);
+ Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+ Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+ Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+ DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+ conf = new Configuration();
+ Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+ Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+ Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), null);
+ options = OptionsParser.parse(new String[] {
+ "-update",
+ "-delete",
+ "-pu",
+ "-bandwidth",
+ "11",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ options.appendToConf(conf);
+ Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+ Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+ Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
+ Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.GlobbedCopyListing;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.*;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TestCopyCommitter {
+ private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
+
+ private static final Random rand = new Random();
+
+ private static final Credentials CREDENTIALS = new Credentials();
+ public static final int PORT = 39737;
+
+
+ private static Configuration config;
+ private static MiniDFSCluster cluster;
+
+ private static Job getJobForClient() throws IOException {
+ Job job = Job.getInstance(new Configuration());
+ job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
+ job.setInputFormatClass(NullInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ @BeforeClass
+ public static void create() throws IOException {
+ config = getJobForClient().getConfiguration();
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Before
+ public void createMetaFolder() {
+ config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
+ Path meta = new Path("/meta");
+ try {
+ cluster.getFileSystem().mkdirs(meta);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while creating meta folder", e);
+ Assert.fail("Unable to create meta folder");
+ }
+ }
+
+ @After
+ public void cleanupMetaFolder() {
+ Path meta = new Path("/meta");
+ try {
+ if (cluster.getFileSystem().exists(meta)) {
+ cluster.getFileSystem().delete(meta, true);
+ Assert.fail("Expected meta folder to be deleted");
+ }
+ } catch (IOException e) {
+ LOG.error("Exception encountered while cleaning up folder", e);
+ Assert.fail("Unable to clean up meta folder");
+ }
+ }
+
+ @Test
+ public void testNoCommitAction() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ committer.commitJob(jobContext);
+ Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Commit failed");
+ }
+ }
+
+ @Test
+ public void testPreserveStatus() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ FsPermission sourcePerm = new FsPermission((short) 511);
+ FsPermission initialPerm = new FsPermission((short) 448);
+ sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
+ targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.preserve(FileAttribute.PERMISSION);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+ Assert.fail("Permission don't match");
+ }
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+ Assert.fail("Permission don't match");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for preserve status", e);
+ Assert.fail("Preserve status failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+
+ }
+
+ @Test
+ public void testDeleteMissing() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ fs.rename(new Path(targetBaseAdd), new Path(targetBase));
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.setSyncFolder(true);
+ options.setDeleteMissing(true);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ } catch (Throwable e) {
+ LOG.error("Exception encountered while testing for delete missing", e);
+ Assert.fail("Delete missing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+ }
+ }
+
+ @Test
+ public void testDeleteMissingFlatInterleavedFiles() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
+ targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
+ TestDistCpUtils.createFile(fs, sourceBase + "/1");
+ TestDistCpUtils.createFile(fs, sourceBase + "/3");
+ TestDistCpUtils.createFile(fs, sourceBase + "/4");
+ TestDistCpUtils.createFile(fs, sourceBase + "/5");
+ TestDistCpUtils.createFile(fs, sourceBase + "/7");
+ TestDistCpUtils.createFile(fs, sourceBase + "/8");
+ TestDistCpUtils.createFile(fs, sourceBase + "/9");
+
+ TestDistCpUtils.createFile(fs, targetBase + "/2");
+ TestDistCpUtils.createFile(fs, targetBase + "/4");
+ TestDistCpUtils.createFile(fs, targetBase + "/5");
+ TestDistCpUtils.createFile(fs, targetBase + "/7");
+ TestDistCpUtils.createFile(fs, targetBase + "/9");
+ TestDistCpUtils.createFile(fs, targetBase + "/A");
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.setSyncFolder(true);
+ options.setDeleteMissing(true);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for delete missing", e);
+ Assert.fail("Delete missing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+ }
+
+ }
+
+ @Test
+ public void testAtomicCommitMissingFinal() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+ String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ fs.mkdirs(new Path(workPath));
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertFalse(fs.exists(new Path(finalPath)));
+ committer.commitJob(jobContext);
+ Assert.assertFalse(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ Assert.assertFalse(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for preserve status", e);
+ Assert.fail("Atomic commit failure");
+ } finally {
+ TestDistCpUtils.delete(fs, workPath);
+ TestDistCpUtils.delete(fs, finalPath);
+ }
+ }
+
+ @Test
+ public void testAtomicCommitExistingFinal() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ fs.mkdirs(new Path(workPath));
+ fs.mkdirs(new Path(finalPath));
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+ try {
+ committer.commitJob(jobContext);
+ Assert.fail("Should not be able to atomic-commit to pre-existing path.");
+ } catch(Exception exception) {
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+ LOG.info("Atomic-commit Test pass.");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for atomic commit.", e);
+ Assert.fail("Atomic commit failure");
+ } finally {
+ TestDistCpUtils.delete(fs, workPath);
+ TestDistCpUtils.delete(fs, finalPath);
+ }
+ }
+
+ private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
+ return new TaskAttemptContextImpl(conf,
+ new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
+ }
+
+ private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
+ FsPermission sourcePerm) throws IOException {
+ Path base = new Path(targetBase);
+
+ Stack<Path> stack = new Stack<Path>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) continue;
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) continue;
+
+ for (FileStatus status : fStatus) {
+ if (status.isDirectory()) {
+ stack.push(status.getPath());
+ Assert.assertEquals(status.getPermission(), sourcePerm);
+ }
+ }
+ }
+ return true;
+ }
+
+ private static class NullInputFormat extends InputFormat {
+ @Override
+ public List getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return null;
+ }
+ }
+}