You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:50:03 UTC
svn commit: r1236049 [5/6] - in /hadoop/common/branches/branch-0.23:
hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/
hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/h...
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.GlobbedCopyListing;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.security.Credentials;
+import org.junit.*;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TestCopyCommitter {
+ private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
+
+ private static final Random rand = new Random();
+
+ private static final Credentials CREDENTIALS = new Credentials();
+ public static final int PORT = 39737;
+
+
+ private static Configuration config;
+ private static MiniDFSCluster cluster;
+
+ private static Job getJobForClient() throws IOException {
+ Job job = Job.getInstance(new Configuration());
+ job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
+ job.setInputFormatClass(NullInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ @BeforeClass
+ public static void create() throws IOException {
+ config = getJobForClient().getConfiguration();
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Before
+ public void createMetaFolder() {
+ config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
+ Path meta = new Path("/meta");
+ try {
+ cluster.getFileSystem().mkdirs(meta);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while creating meta folder", e);
+ Assert.fail("Unable to create meta folder");
+ }
+ }
+
+ @After
+ public void cleanupMetaFolder() {
+ Path meta = new Path("/meta");
+ try {
+ if (cluster.getFileSystem().exists(meta)) {
+ cluster.getFileSystem().delete(meta, true);
+ Assert.fail("Expected meta folder to be deleted");
+ }
+ } catch (IOException e) {
+ LOG.error("Exception encountered while cleaning up folder", e);
+ Assert.fail("Unable to clean up meta folder");
+ }
+ }
+
+ @Test
+ public void testNoCommitAction() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ committer.commitJob(jobContext);
+ Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Commit failed");
+ }
+ }
+
+ @Test
+ public void testPreserveStatus() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ FsPermission sourcePerm = new FsPermission((short) 511);
+ FsPermission initialPerm = new FsPermission((short) 448);
+ sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
+ targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.preserve(FileAttribute.PERMISSION);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+ Assert.fail("Permission don't match");
+ }
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+ Assert.fail("Permission don't match");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for preserve status", e);
+ Assert.fail("Preserve status failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+
+ }
+
+ @Test
+ public void testDeleteMissing() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+ fs.rename(new Path(targetBaseAdd), new Path(targetBase));
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.setSyncFolder(true);
+ options.setDeleteMissing(true);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ } catch (Throwable e) {
+ LOG.error("Exception encountered while testing for delete missing", e);
+ Assert.fail("Delete missing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+ }
+ }
+
+ @Test
+ public void testDeleteMissingFlatInterleavedFiles() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
+ targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
+ TestDistCpUtils.createFile(fs, sourceBase + "/1");
+ TestDistCpUtils.createFile(fs, sourceBase + "/3");
+ TestDistCpUtils.createFile(fs, sourceBase + "/4");
+ TestDistCpUtils.createFile(fs, sourceBase + "/5");
+ TestDistCpUtils.createFile(fs, sourceBase + "/7");
+ TestDistCpUtils.createFile(fs, sourceBase + "/8");
+ TestDistCpUtils.createFile(fs, sourceBase + "/9");
+
+ TestDistCpUtils.createFile(fs, targetBase + "/2");
+ TestDistCpUtils.createFile(fs, targetBase + "/4");
+ TestDistCpUtils.createFile(fs, targetBase + "/5");
+ TestDistCpUtils.createFile(fs, targetBase + "/7");
+ TestDistCpUtils.createFile(fs, targetBase + "/9");
+ TestDistCpUtils.createFile(fs, targetBase + "/A");
+
+ DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+ new Path("/out"));
+ options.setSyncFolder(true);
+ options.setDeleteMissing(true);
+ options.appendToConf(conf);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, options);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+ Assert.fail("Source and target folders are not in sync");
+ }
+ Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for delete missing", e);
+ Assert.fail("Delete missing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+ }
+
+ }
+
+ @Test
+ public void testAtomicCommitMissingFinal() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+ String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ fs.mkdirs(new Path(workPath));
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertFalse(fs.exists(new Path(finalPath)));
+ committer.commitJob(jobContext);
+ Assert.assertFalse(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ Assert.assertFalse(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for preserve status", e);
+ Assert.fail("Atomic commit failure");
+ } finally {
+ TestDistCpUtils.delete(fs, workPath);
+ TestDistCpUtils.delete(fs, finalPath);
+ }
+ }
+
+ @Test
+ public void testAtomicCommitExistingFinal() {
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+
+ String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+ FileSystem fs = null;
+ try {
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ fs = FileSystem.get(conf);
+ fs.mkdirs(new Path(workPath));
+ fs.mkdirs(new Path(finalPath));
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+ conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+ try {
+ committer.commitJob(jobContext);
+ Assert.fail("Should not be able to atomic-commit to pre-existing path.");
+ } catch(Exception exception) {
+ Assert.assertTrue(fs.exists(new Path(workPath)));
+ Assert.assertTrue(fs.exists(new Path(finalPath)));
+ LOG.info("Atomic-commit Test pass.");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing for atomic commit.", e);
+ Assert.fail("Atomic commit failure");
+ } finally {
+ TestDistCpUtils.delete(fs, workPath);
+ TestDistCpUtils.delete(fs, finalPath);
+ }
+ }
+
+ private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
+ return new TaskAttemptContextImpl(conf,
+ new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
+ }
+
+ private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
+ FsPermission sourcePerm) throws IOException {
+ Path base = new Path(targetBase);
+
+ Stack<Path> stack = new Stack<Path>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) continue;
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) continue;
+
+ for (FileStatus status : fStatus) {
+ if (status.isDirectory()) {
+ stack.push(status.getPath());
+ Assert.assertEquals(status.getPermission(), sourcePerm);
+ }
+ }
+ }
+ return true;
+ }
+
+ private static class NullInputFormat extends InputFormat {
+ @Override
+ public List getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return null;
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,826 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+public class TestCopyMapper {
+ private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
+ private static List<Path> pathList = new ArrayList<Path>();
+ private static int nFiles = 0;
+ private static final int FILE_SIZE = 1024;
+
+ private static MiniDFSCluster cluster;
+
+ private static final String SOURCE_PATH = "/tmp/source";
+ private static final String TARGET_PATH = "/tmp/target";
+
+ private static Configuration configuration;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ configuration = getConfigurationForCluster();
+ cluster = new MiniDFSCluster.Builder(configuration)
+ .numDataNodes(1)
+ .format(true)
+ .build();
+ }
+
+ private static Configuration getConfigurationForCluster() throws IOException {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static Configuration getConfiguration() throws IOException {
+ Configuration configuration = getConfigurationForCluster();
+ final FileSystem fs = cluster.getFileSystem();
+ Path workPath = new Path(TARGET_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath.toString());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ workPath.toString());
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ false);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
+ true);
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ "br");
+ return configuration;
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs(SOURCE_PATH + "/1");
+ mkdirs(SOURCE_PATH + "/2");
+ mkdirs(SOURCE_PATH + "/2/3/4");
+ mkdirs(SOURCE_PATH + "/2/3");
+ mkdirs(SOURCE_PATH + "/5");
+ touchFile(SOURCE_PATH + "/5/6");
+ mkdirs(SOURCE_PATH + "/7");
+ mkdirs(SOURCE_PATH + "/7/8");
+ touchFile(SOURCE_PATH + "/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+ fileSystem.getWorkingDirectory());
+ pathList.add(qualifiedPath);
+ fileSystem.mkdirs(qualifiedPath);
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fs;
+ DataOutputStream outputStream = null;
+ try {
+ fs = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ final long blockSize = fs.getDefaultBlockSize() * 2;
+ outputStream = fs.create(qualifiedPath, true, 0,
+ (short)(fs.getDefaultReplication()*2),
+ blockSize);
+ outputStream.write(new byte[FILE_SIZE]);
+ pathList.add(qualifiedPath);
+ ++nFiles;
+
+ FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
+ System.out.println(fileStatus.getBlockSize());
+ System.out.println(fileStatus.getReplication());
+ }
+ finally {
+ IOUtils.cleanup(null, outputStream);
+ }
+ }
+
+ @Test
+ public void testRun() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+ copyMapper.setup(context);
+
+ for (Path path: pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ // Check that the maps worked.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+ Assert.assertEquals(fs.getFileStatus(path).getReplication(),
+ fs.getFileStatus(targetPath).getReplication());
+ Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
+ fs.getFileStatus(targetPath).getBlockSize());
+ Assert.assertTrue(!fs.isFile(targetPath) ||
+ fs.getFileChecksum(targetPath).equals(
+ fs.getFileChecksum(path)));
+ }
+
+ Assert.assertEquals(pathList.size(),
+ stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
+ Assert.assertEquals(nFiles * FILE_SIZE,
+ stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+ testCopyingExistingFiles(fs, copyMapper, context);
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Unexpected exception: ", e);
+ Assert.assertTrue(false);
+ }
+ }
+
+ private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+ Mapper<Text, FileStatus, Text, Text>.Context context) {
+
+ try {
+ for (Path path : pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ Assert.assertEquals(nFiles,
+ context.getCounter(CopyMapper.Counter.SKIP).getValue());
+ }
+ catch (Exception exception) {
+ Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
+ false);
+ }
+ }
+
+ @Test
+ public void testMakeDirFailure() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath);
+ copyMapper.setup(context);
+
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
+ fs.getFileStatus(pathList.get(0)), context);
+
+ Assert.assertTrue("There should have been an exception.", false);
+ }
+ catch (Exception ignore) {
+ }
+ }
+
+ @Test
+ public void testIgnoreFailures() {
+ doTestIgnoreFailures(true);
+ doTestIgnoreFailures(false);
+ }
+
+ @Test
+ public void testDirToFile() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ mkdirs(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Expected copy to fail");
+ } catch (AccessControlException e) {
+ Assert.assertTrue("Got exception: " + e.getMessage(), true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCopyReadableFiles() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSkipCopyNoPerms() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.assertEquals(stubContext.getWriter().values().size(), 1);
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
+ contains(SOURCE_PATH + "/src/file"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailCopyWithAccessControlException() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
+ out.write("hello world".getBytes());
+ out.close();
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Didn't expect the file to be copied");
+ } catch (AccessControlException ignore) {
+ } catch (Exception e) {
+ if (e.getCause() == null || !(e.getCause() instanceof AccessControlException)) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFileToDir() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ private void doTestIgnoreFailures(boolean ignoreFailures) {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ configuration.setBoolean(
+ DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ if (!fileStatus.isDirectory()) {
+ fs.delete(path, true);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+ }
+ if (ignoreFailures) {
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
+ }
+ }
+ Assert.assertTrue("There should have been an exception.", ignoreFailures);
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(),
+ !ignoreFailures);
+ e.printStackTrace();
+ }
+ }
+
+ private static void deleteState() throws IOException {
+ pathList.clear();
+ nFiles = 0;
+ cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
+ cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+ }
+
+ @Test
+ public void testPreserveBlockSizeAndReplication() {
+ testPreserveBlockSizeAndReplicationImpl(true);
+ testPreserveBlockSizeAndReplicationImpl(false);
+ }
+
+ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
+ fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the block-size/replication aren't preserved.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDirectory() ) {
+ Assert.assertTrue(preserve ||
+ source.getBlockSize() != target.getBlockSize());
+ Assert.assertTrue(preserve ||
+ source.getReplication() != target.getReplication());
+ Assert.assertTrue(!preserve ||
+ source.getBlockSize() == target.getBlockSize());
+ Assert.assertTrue(!preserve ||
+ source.getReplication() == target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+
+ private static void changeUserGroup(String user, String group)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ FsPermission changedPermission = new FsPermission(
+ FsAction.ALL, FsAction.ALL, FsAction.ALL
+ );
+ for (Path path : pathList)
+ if (fs.isFile(path)) {
+ fs.setOwner(path, user, group);
+ fs.setPermission(path, changedPermission);
+ }
+ }
+
+ /**
+ * If a single file is being copied to a location where the file (of the same
+ * name) already exists, then the file shouldn't be skipped.
+ */
+ @Test
+ public void testSingleFileCopy() {
+ try {
+ deleteState();
+ touchFile(SOURCE_PATH + "/1");
+ Path sourceFilePath = pathList.get(0);
+ Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
+ SOURCE_PATH, TARGET_PATH));
+ touchFile(targetFilePath.toString());
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.getParent().toString()); // Parent directory.
+ copyMapper.setup(context);
+
+ final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+
+ long before = fs.getFileStatus(targetFilePath).getModificationTime();
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ long after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been skipped", before == after);
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.toString()); // Specify the file path.
+ copyMapper.setup(context);
+
+ before = fs.getFileStatus(targetFilePath).getModificationTime();
+ try { Thread.sleep(2); } catch (Throwable ignore) {}
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been overwritten.", before < after);
+
+ } catch (Exception exception) {
+ Assert.fail("Unexpected exception: " + exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testPreserveUserGroup() {
+ testPreserveUserGroupImpl(true);
+ testPreserveUserGroupImpl(false);
+ }
+
+ private void testPreserveUserGroupImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+ changeUserGroup("Michael", "Corleone");
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.USER);
+ fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
+ fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
+ }
+
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the user/group attributes are preserved
+ // (only) as necessary.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDirectory()) {
+ Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue(source.isDirectory() ||
+ source.getReplication() != target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class TestCopyOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestCopyOutputFormat.class);
+
+ @Test
+ public void testSetCommitDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setCommitDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getCommitDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Commit Directory");
+ }
+ }
+
+ @Test
+ public void testSetWorkingDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setWorkingDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getWorkingDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Working Directory");
+ }
+ }
+
+ @Test
+ public void testGetOutputCommitter() {
+ try {
+ TaskAttemptContext context = new TaskAttemptContextImpl(new Configuration(),
+ new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
+ context.getConfiguration().set("mapred.output.dir", "/out");
+ Assert.assertTrue(new CopyOutputFormat().getOutputCommitter(context) instanceof CopyCommitter);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Unable to get output committer");
+ }
+ }
+
+ @Test
+ public void testCheckOutputSpecs() {
+ try {
+ OutputFormat outputFormat = new CopyOutputFormat();
+ Job job = Job.getInstance(new Configuration());
+ JobID jobID = new JobID("200707121733", 1);
+
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work/commit path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid commit path");
+ } catch (IllegalStateException ignore) { }
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ } catch (IllegalStateException ignore) {
+ Assert.fail("Output spec check failed.");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ } catch (InterruptedException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformSizeInputFormat {
+ private static final Log LOG
+ = LogFactory.getLog(TestUniformSizeInputFormat.class);
+
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE=1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
+ distCpOptions.setMaxMaps(nMaps);
+ return distCpOptions;
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpOptions options = getOptions(nMaps);
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_1/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).
+ buildListing(listFile, options);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();
+ List<InputSplit> splits
+ = uniformSizeInputFormat.getSplits(jobContext);
+
+ List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
+
+ int sizePerMap = totalFileSize/nMaps;
+
+ checkSplits(listFile, splits);
+ checkAgainstLegacy(splits, legacySplits);
+
+ int doubleCheckedTotalSize = 0;
+ int previousSplitSize = -1;
+ for (int i=0; i<splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
+ split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ while (recordReader.nextKeyValue()) {
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ Assert.assertEquals(fileStatus.length, 1);
+ currentSplitSize += fileStatus[0].getLen();
+ }
+ Assert.assertTrue(
+ previousSplitSize == -1
+ || Math.abs(currentSplitSize - previousSplitSize) < 0.1*sizePerMap
+ || i == splits.size()-1);
+
+ doubleCheckedTotalSize += currentSplitSize;
+ }
+
+ Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+ }
+
+ // From
+ // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
+ private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
+ throws IOException {
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus srcst = fs.getFileStatus(listFile);
+ Configuration conf = fs.getConf();
+
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ FileStatus value = new FileStatus();
+ Text key = new Text();
+ final long targetsize = totalFileSize / numSplits;
+ long pos = 0L;
+ long last = 0L;
+ long acc = 0L;
+ long cbrem = srcst.getLen();
+ SequenceFile.Reader sl = null;
+
+ LOG.info("Average bytes per map: " + targetsize +
+ ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
+
+ try {
+ sl = new SequenceFile.Reader(conf, SequenceFile.Reader.file(listFile));
+ for (; sl.next(key, value); last = sl.getPosition()) {
+ // if adding this split would put this split past the target size,
+ // cut the last split and put this next file in the next split.
+ if (acc + value.getLen() > targetsize && acc != 0) {
+ long splitsize = last - pos;
+ FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
+ splits.add(fileSplit);
+ cbrem -= splitsize;
+ pos = last;
+ acc = 0L;
+ }
+ acc += value.getLen();
+ }
+ }
+ finally {
+ IOUtils.closeStream(sl);
+ }
+ if (cbrem != 0) {
+ FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
+ splits.add(fileSplit);
+ }
+
+ return splits;
+ }
+
+ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
+ long lastEnd = 0;
+
+ //Verify if each split's start is matching with the previous end and
+ //we are not missing anything
+ for (InputSplit split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ long start = fileSplit.getStart();
+ Assert.assertEquals(lastEnd, start);
+ lastEnd = start + fileSplit.getLength();
+ }
+
+ //Verify there is nothing more to read from the input file
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
+ SequenceFile.Reader.file(listFile));
+
+ try {
+ reader.seek(lastEnd);
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ private void checkAgainstLegacy(List<InputSplit> splits,
+ List<InputSplit> legacySplits)
+ throws IOException, InterruptedException {
+
+ Assert.assertEquals(legacySplits.size(), splits.size());
+ for (int index = 0; index < splits.size(); index++) {
+ FileSplit fileSplit = (FileSplit) splits.get(index);
+ FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
+ Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
+ }
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ testGetSplits(9);
+ for (int i=1; i<N_FILES; ++i)
+ testGetSplits(i);
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDynamicInputFormat {
+ private static final Log LOG = LogFactory.getLog(TestDynamicInputFormat.class);
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 1000;
+ private static final int NUM_SPLITS = 7;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static List<String> expectedFilePaths = new ArrayList<String>(N_FILES);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(getConfigurationForCluster())
+ .numDataNodes(1).format(true).build();
+
+ for (int i=0; i<N_FILES; ++i)
+ createFile("/tmp/source/" + String.valueOf(i));
+
+ }
+
+ private static Configuration getConfigurationForCluster() {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data",
+ "target/tmp/build/TEST_DYNAMIC_INPUT_FORMAT/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static DistCpOptions getOptions() throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ DistCpOptions options = new DistCpOptions(sourceList, targetPath);
+ options.setMaxMaps(NUM_SPLITS);
+ return options;
+ }
+
+ private static void createFile(String path) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ expectedFilePaths.add(fileSystem.listStatus(
+ new Path(path))[0].getPath().toString());
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ DistCpOptions options = getOptions();
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
+ new Path(cluster.getFileSystem().getUri().toString()
+ +"/tmp/testDynInputFormat/fileList.seq"), options);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ DynamicInputFormat<Text, FileStatus> inputFormat =
+ new DynamicInputFormat<Text, FileStatus>();
+ List<InputSplit> splits = inputFormat.getSplits(jobContext);
+
+ int nFiles = 0;
+ int taskId = 0;
+
+ for (InputSplit split : splits) {
+ RecordReader<Text, FileStatus> recordReader =
+ inputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, taskId);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+
+ recordReader.initialize(splits.get(0), taskAttemptContext);
+ float previousProgressValue = 0f;
+ while (recordReader.nextKeyValue()) {
+ FileStatus fileStatus = recordReader.getCurrentValue();
+ String source = fileStatus.getPath().toString();
+ System.out.println(source);
+ Assert.assertTrue(expectedFilePaths.contains(source));
+ final float progress = recordReader.getProgress();
+ Assert.assertTrue(progress >= previousProgressValue);
+ Assert.assertTrue(progress >= 0.0f);
+ Assert.assertTrue(progress <= 1.0f);
+ previousProgressValue = progress;
+ ++nFiles;
+ }
+ Assert.assertTrue(recordReader.getProgress() == 1.0f);
+
+ ++taskId;
+ }
+
+ Assert.assertEquals(expectedFilePaths.size(), nFiles);
+ }
+
+ @Test
+ public void testGetSplitRatio() throws Exception {
+ Assert.assertEquals(1, DynamicInputFormat.getSplitRatio(1, 1000000000));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
+ Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.Stack;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestDistCpUtils {
+ private static final Log LOG = LogFactory.getLog(TestDistCpUtils.class);
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetRelativePathRoot() {
+ Path root = new Path("/tmp/abc");
+ Path child = new Path("/tmp/abc/xyz/file");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/xyz/file");
+
+ root = new Path("/");
+ child = new Path("/a");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/a");
+ }
+
+ @Test
+ public void testPackAttributes() {
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "");
+
+ attributes.add(FileAttribute.REPLICATION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "R");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("R"));
+
+ attributes.add(FileAttribute.BLOCKSIZE);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RB");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RB"));
+
+ attributes.add(FileAttribute.USER);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBU");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBU"));
+
+ attributes.add(FileAttribute.GROUP);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUG");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUG"));
+
+ attributes.add(FileAttribute.PERMISSION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUGP");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUGP"));
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ FileSystem fs = FileSystem.get(config);
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+
+
+ Path path = new Path("/tmp/abc");
+ Path src = new Path("/tmp/src");
+ fs.mkdirs(path);
+ fs.mkdirs(src);
+ FileStatus srcStatus = fs.getFileStatus(src);
+
+ FsPermission noPerm = new FsPermission((short) 0);
+ fs.setPermission(path, noPerm);
+ fs.setOwner(path, "nobody", "nobody");
+
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ FileStatus target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), noPerm);
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.PERMISSION);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.GROUP);
+ attributes.add(FileAttribute.USER);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
+ Assert.assertEquals(target.getGroup(), srcStatus.getGroup());
+
+ fs.delete(path, true);
+ fs.delete(src, true);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Preserve test failure");
+ }
+ }
+
+ private static Random rand = new Random();
+
+ public static String createTestSetup(FileSystem fs) throws IOException {
+ return createTestSetup("/tmp1", fs, FsPermission.getDefault());
+ }
+
+ public static String createTestSetup(FileSystem fs,
+ FsPermission perm) throws IOException {
+ return createTestSetup("/tmp1", fs, perm);
+ }
+
+ public static String createTestSetup(String baseDir,
+ FileSystem fs,
+ FsPermission perm) throws IOException {
+ String base = getBase(baseDir);
+ fs.mkdirs(new Path(base + "/newTest/hello/world1"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world2/newworld"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world3/oldworld"));
+ fs.setPermission(new Path(base + "/newTest"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world1"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2/newworld"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3/oldworld"), perm);
+ createFile(fs, base + "/newTest/1");
+ createFile(fs, base + "/newTest/hello/2");
+ createFile(fs, base + "/newTest/hello/world3/oldworld/3");
+ createFile(fs, base + "/newTest/hello/world2/4");
+ return base;
+ }
+
+ private static String getBase(String base) {
+ String location = String.valueOf(rand.nextLong());
+ return base + "/" + location;
+ }
+
+ public static void delete(FileSystem fs, String path) {
+ try {
+ if (fs != null) {
+ if (path != null) {
+ fs.delete(new Path(path), true);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception encountered ", e);
+ }
+ }
+
+ public static void createFile(FileSystem fs, String filePath) throws IOException {
+ OutputStream out = fs.create(new Path(filePath));
+ IOUtils.closeStream(out);
+ }
+
+ public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
+ throws IOException {
+ Path base = new Path(targetBase);
+
+ Stack<Path> stack = new Stack<Path>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) continue;
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) continue;
+
+ for (FileStatus status : fStatus) {
+ if (status.isDirectory()) {
+ stack.push(status.getPath());
+ }
+ Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
+ DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
+ }
+ }
+ return true;
+ }
+}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java?rev=1236049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java Thu Jan 26 06:50:00 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestRetriableCommand {
+
+ private static class MyRetriableCommand extends RetriableCommand {
+
+ private int succeedAfter;
+ private int retryCount = 0;
+
+ public MyRetriableCommand(int succeedAfter) {
+ super("MyRetriableCommand");
+ this.succeedAfter = succeedAfter;
+ }
+
+ public MyRetriableCommand(int succeedAfter, RetryPolicy retryPolicy) {
+ super("MyRetriableCommand", retryPolicy);
+ this.succeedAfter = succeedAfter;
+ }
+
+ @Override
+ protected Object doExecute(Object... arguments) throws Exception {
+ if (++retryCount < succeedAfter)
+ throw new Exception("Transient failure#" + retryCount);
+ return 0;
+ }
+ }
+
+ @Test
+ public void testRetriableCommand() {
+ try {
+ new MyRetriableCommand(5).execute(0);
+ Assert.assertTrue(false);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+
+ try {
+ new MyRetriableCommand(3).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+
+ try {
+ new MyRetriableCommand(5, RetryPolicies.
+ retryUpToMaximumCountWithFixedSleep(5, 0, TimeUnit.MILLISECONDS)).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+}