You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC
svn commit: r1236045 [5/5] - in /hadoop/common/trunk: hadoop-project/
hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/
hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,826 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+public class TestCopyMapper {
+ private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
+ private static List<Path> pathList = new ArrayList<Path>();
+ private static int nFiles = 0;
+ private static final int FILE_SIZE = 1024;
+
+ private static MiniDFSCluster cluster;
+
+ private static final String SOURCE_PATH = "/tmp/source";
+ private static final String TARGET_PATH = "/tmp/target";
+
+ private static Configuration configuration;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ configuration = getConfigurationForCluster();
+ cluster = new MiniDFSCluster.Builder(configuration)
+ .numDataNodes(1)
+ .format(true)
+ .build();
+ }
+
+ private static Configuration getConfigurationForCluster() throws IOException {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static Configuration getConfiguration() throws IOException {
+ Configuration configuration = getConfigurationForCluster();
+ final FileSystem fs = cluster.getFileSystem();
+ Path workPath = new Path(TARGET_PATH)
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath.toString());
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ workPath.toString());
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ false);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
+ true);
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ "br");
+ return configuration;
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs(SOURCE_PATH + "/1");
+ mkdirs(SOURCE_PATH + "/2");
+ mkdirs(SOURCE_PATH + "/2/3/4");
+ mkdirs(SOURCE_PATH + "/2/3");
+ mkdirs(SOURCE_PATH + "/5");
+ touchFile(SOURCE_PATH + "/5/6");
+ mkdirs(SOURCE_PATH + "/7");
+ mkdirs(SOURCE_PATH + "/7/8");
+ touchFile(SOURCE_PATH + "/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
+ fileSystem.getWorkingDirectory());
+ pathList.add(qualifiedPath);
+ fileSystem.mkdirs(qualifiedPath);
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fs;
+ DataOutputStream outputStream = null;
+ try {
+ fs = cluster.getFileSystem();
+ final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
+ fs.getWorkingDirectory());
+ final long blockSize = fs.getDefaultBlockSize() * 2;
+ outputStream = fs.create(qualifiedPath, true, 0,
+ (short)(fs.getDefaultReplication()*2),
+ blockSize);
+ outputStream.write(new byte[FILE_SIZE]);
+ pathList.add(qualifiedPath);
+ ++nFiles;
+
+ FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
+ System.out.println(fileStatus.getBlockSize());
+ System.out.println(fileStatus.getReplication());
+ }
+ finally {
+ IOUtils.cleanup(null, outputStream);
+ }
+ }
+
+ @Test
+ public void testRun() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+ copyMapper.setup(context);
+
+ for (Path path: pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ // Check that the maps worked.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+ Assert.assertEquals(fs.getFileStatus(path).getReplication(),
+ fs.getFileStatus(targetPath).getReplication());
+ Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
+ fs.getFileStatus(targetPath).getBlockSize());
+ Assert.assertTrue(!fs.isFile(targetPath) ||
+ fs.getFileChecksum(targetPath).equals(
+ fs.getFileChecksum(path)));
+ }
+
+ Assert.assertEquals(pathList.size(),
+ stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
+ Assert.assertEquals(nFiles * FILE_SIZE,
+ stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+ testCopyingExistingFiles(fs, copyMapper, context);
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Unexpected exception: ", e);
+ Assert.assertTrue(false);
+ }
+ }
+
+ private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+ Mapper<Text, FileStatus, Text, Text>.Context context) {
+
+ try {
+ for (Path path : pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fs.getFileStatus(path), context);
+ }
+
+ Assert.assertEquals(nFiles,
+ context.getCounter(CopyMapper.Counter.SKIP).getValue());
+ }
+ catch (Exception exception) {
+ Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
+ false);
+ }
+ }
+
+ @Test
+ public void testMakeDirFailure() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
+ configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workPath);
+ copyMapper.setup(context);
+
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
+ fs.getFileStatus(pathList.get(0)), context);
+
+ Assert.assertTrue("There should have been an exception.", false);
+ }
+ catch (Exception ignore) {
+ }
+ }
+
+ @Test
+ public void testIgnoreFailures() {
+ doTestIgnoreFailures(true);
+ doTestIgnoreFailures(false);
+ }
+
+ @Test
+ public void testDirToFile() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ mkdirs(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Expected copy to fail");
+ } catch (AccessControlException e) {
+ Assert.assertTrue("Got exception: " + e.getMessage(), true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCopyReadableFiles() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
+ doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, FileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH);
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSkipCopyNoPerms() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ touchFile(TARGET_PATH + "/src/file");
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.assertEquals(stubContext.getWriter().values().size(), 1);
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
+ Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
+ contains(SOURCE_PATH + "/src/file"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailCopyWithAccessControlException() {
+ try {
+ deleteState();
+ createSourceData();
+
+ UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
+
+ final CopyMapper copyMapper = new CopyMapper();
+
+ final StubContext stubContext = tmpUser.
+ doAs(new PrivilegedAction<StubContext>() {
+ @Override
+ public StubContext run() {
+ try {
+ return new StubContext(getConfiguration(), null, 0);
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ EnumSet<DistCpOptions.FileAttribute> preserveStatus =
+ EnumSet.allOf(DistCpOptions.FileAttribute.class);
+
+ final Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+
+ touchFile(SOURCE_PATH + "/src/file");
+ OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
+ out.write("hello world".getBytes());
+ out.close();
+ cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+ cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
+ new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
+
+ final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
+ @Override
+ public FileSystem run() {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ throw new RuntimeException("Test ought to fail here");
+ }
+ }
+ });
+
+ tmpUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ Assert.fail("Didn't expect the file to be copied");
+ } catch (AccessControlException ignore) {
+ } catch (Exception e) {
+ if (e.getCause() == null || !(e.getCause() instanceof AccessControlException)) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFileToDir() {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ touchFile(SOURCE_PATH + "/src/file");
+ mkdirs(TARGET_PATH + "/src/file");
+ try {
+ copyMapper.setup(context);
+ copyMapper.map(new Text("/src/file"),
+ fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ context);
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test failed: " + e.getMessage());
+ }
+ }
+
+ private void doTestIgnoreFailures(boolean ignoreFailures) {
+ try {
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ configuration.setBoolean(
+ DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
+ configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
+ true);
+ configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
+ true);
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ if (!fileStatus.isDirectory()) {
+ fs.delete(path, true);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+ }
+ if (ignoreFailures) {
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
+ }
+ }
+ Assert.assertTrue("There should have been an exception.", ignoreFailures);
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(),
+ !ignoreFailures);
+ e.printStackTrace();
+ }
+ }
+
+ private static void deleteState() throws IOException {
+ pathList.clear();
+ nFiles = 0;
+ cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
+ cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
+ }
+
+ @Test
+ public void testPreserveBlockSizeAndReplication() {
+ testPreserveBlockSizeAndReplicationImpl(true);
+ testPreserveBlockSizeAndReplicationImpl(false);
+ }
+
+ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
+ fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the block-size/replication aren't preserved.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDirectory() ) {
+ Assert.assertTrue(preserve ||
+ source.getBlockSize() != target.getBlockSize());
+ Assert.assertTrue(preserve ||
+ source.getReplication() != target.getReplication());
+ Assert.assertTrue(!preserve ||
+ source.getBlockSize() == target.getBlockSize());
+ Assert.assertTrue(!preserve ||
+ source.getReplication() == target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+
+ private static void changeUserGroup(String user, String group)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ FsPermission changedPermission = new FsPermission(
+ FsAction.ALL, FsAction.ALL, FsAction.ALL
+ );
+ for (Path path : pathList)
+ if (fs.isFile(path)) {
+ fs.setOwner(path, user, group);
+ fs.setPermission(path, changedPermission);
+ }
+ }
+
+ /**
+ * If a single file is being copied to a location where the file (of the same
+ * name) already exists, then the file shouldn't be skipped.
+ */
+ @Test
+ public void testSingleFileCopy() {
+ try {
+ deleteState();
+ touchFile(SOURCE_PATH + "/1");
+ Path sourceFilePath = pathList.get(0);
+ Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
+ SOURCE_PATH, TARGET_PATH));
+ touchFile(targetFilePath.toString());
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.getParent().toString()); // Parent directory.
+ copyMapper.setup(context);
+
+ final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+
+ long before = fs.getFileStatus(targetFilePath).getModificationTime();
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ long after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been skipped", before == after);
+
+ context.getConfiguration().set(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ targetFilePath.toString()); // Specify the file path.
+ copyMapper.setup(context);
+
+ before = fs.getFileStatus(targetFilePath).getModificationTime();
+ try { Thread.sleep(2); } catch (Throwable ignore) {}
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(
+ new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
+ after = fs.getFileStatus(targetFilePath).getModificationTime();
+
+ Assert.assertTrue("File should have been overwritten.", before < after);
+
+ } catch (Exception exception) {
+ Assert.fail("Unexpected exception: " + exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testPreserveUserGroup() {
+ testPreserveUserGroupImpl(true);
+ testPreserveUserGroupImpl(false);
+ }
+
+ private void testPreserveUserGroupImpl(boolean preserve){
+ try {
+
+ deleteState();
+ createSourceData();
+ changeUserGroup("Michael", "Corleone");
+
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, FileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+ if (preserve) {
+ fileAttributes.add(DistCpOptions.FileAttribute.USER);
+ fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
+ fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
+ }
+
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+ copyMapper.setup(context);
+
+ for (Path path : pathList) {
+ final FileStatus fileStatus = fs.getFileStatus(path);
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ fileStatus, context);
+ }
+
+ // Check that the user/group attributes are preserved
+ // (only) as necessary.
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString()
+ .replaceAll(SOURCE_PATH, TARGET_PATH));
+ final FileStatus source = fs.getFileStatus(path);
+ final FileStatus target = fs.getFileStatus(targetPath);
+ if (!source.isDirectory()) {
+ Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
+ Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
+ Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
+ Assert.assertTrue(source.isDirectory() ||
+ source.getReplication() != target.getReplication());
+ }
+ }
+ }
+ catch (Exception e) {
+ Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
+ e.printStackTrace();
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class TestCopyOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestCopyOutputFormat.class);
+
+ @Test
+ public void testSetCommitDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getCommitDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setCommitDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getCommitDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Commit Directory");
+ }
+ }
+
+ @Test
+ public void testSetWorkingDirectory() {
+ try {
+ Job job = Job.getInstance(new Configuration());
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ Assert.assertEquals(null, CopyOutputFormat.getWorkingDirectory(job));
+
+ Path directory = new Path("/tmp/test");
+ CopyOutputFormat.setWorkingDirectory(job, directory);
+ Assert.assertEquals(directory, CopyOutputFormat.getWorkingDirectory(job));
+ Assert.assertEquals(directory.toString(), job.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ } catch (IOException e) {
+ LOG.error("Exception encountered while running test", e);
+ Assert.fail("Failed while testing for set Working Directory");
+ }
+ }
+
+ @Test
+ public void testGetOutputCommitter() {
+ try {
+ TaskAttemptContext context = new TaskAttemptContextImpl(new Configuration(),
+ new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
+ context.getConfiguration().set("mapred.output.dir", "/out");
+ Assert.assertTrue(new CopyOutputFormat().getOutputCommitter(context) instanceof CopyCommitter);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Unable to get output committer");
+ }
+ }
+
+ @Test
+ public void testCheckOutputSpecs() {
+ try {
+ OutputFormat outputFormat = new CopyOutputFormat();
+ Job job = Job.getInstance(new Configuration());
+ JobID jobID = new JobID("200707121733", 1);
+
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work/commit path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid commit path");
+ } catch (IllegalStateException ignore) { }
+
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, "");
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ Assert.fail("No checking for invalid work path");
+ } catch (IllegalStateException ignore) { }
+
+ CopyOutputFormat.setWorkingDirectory(job, new Path("/tmp/work"));
+ CopyOutputFormat.setCommitDirectory(job, new Path("/tmp/commit"));
+ try {
+ JobContext context = new JobContextImpl(job.getConfiguration(), jobID);
+ outputFormat.checkOutputSpecs(context);
+ } catch (IllegalStateException ignore) {
+ Assert.fail("Output spec check failed.");
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ } catch (InterruptedException e) {
+ LOG.error("Exception encountered while testing checkoutput specs", e);
+ Assert.fail("Checkoutput Spec failure");
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformSizeInputFormat {
+ private static final Log LOG
+ = LogFactory.getLog(TestUniformSizeInputFormat.class);
+
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE=1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
+ distCpOptions.setMaxMaps(nMaps);
+ return distCpOptions;
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpOptions options = getOptions(nMaps);
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_1/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).
+ buildListing(listFile, options);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();
+ List<InputSplit> splits
+ = uniformSizeInputFormat.getSplits(jobContext);
+
+ List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
+
+ int sizePerMap = totalFileSize/nMaps;
+
+ checkSplits(listFile, splits);
+ checkAgainstLegacy(splits, legacySplits);
+
+ int doubleCheckedTotalSize = 0;
+ int previousSplitSize = -1;
+ for (int i=0; i<splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
+ split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ while (recordReader.nextKeyValue()) {
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ Assert.assertEquals(fileStatus.length, 1);
+ currentSplitSize += fileStatus[0].getLen();
+ }
+ Assert.assertTrue(
+ previousSplitSize == -1
+ || Math.abs(currentSplitSize - previousSplitSize) < 0.1*sizePerMap
+ || i == splits.size()-1);
+
+ doubleCheckedTotalSize += currentSplitSize;
+ }
+
+ Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+ }
+
+ // From
+ // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
+ private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
+ throws IOException {
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus srcst = fs.getFileStatus(listFile);
+ Configuration conf = fs.getConf();
+
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ FileStatus value = new FileStatus();
+ Text key = new Text();
+ final long targetsize = totalFileSize / numSplits;
+ long pos = 0L;
+ long last = 0L;
+ long acc = 0L;
+ long cbrem = srcst.getLen();
+ SequenceFile.Reader sl = null;
+
+ LOG.info("Average bytes per map: " + targetsize +
+ ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
+
+ try {
+ sl = new SequenceFile.Reader(conf, SequenceFile.Reader.file(listFile));
+ for (; sl.next(key, value); last = sl.getPosition()) {
+ // if adding this split would put this split past the target size,
+ // cut the last split and put this next file in the next split.
+ if (acc + value.getLen() > targetsize && acc != 0) {
+ long splitsize = last - pos;
+ FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
+ splits.add(fileSplit);
+ cbrem -= splitsize;
+ pos = last;
+ acc = 0L;
+ }
+ acc += value.getLen();
+ }
+ }
+ finally {
+ IOUtils.closeStream(sl);
+ }
+ if (cbrem != 0) {
+ FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
+ LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
+ splits.add(fileSplit);
+ }
+
+ return splits;
+ }
+
+ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
+ long lastEnd = 0;
+
+ //Verify if each split's start is matching with the previous end and
+ //we are not missing anything
+ for (InputSplit split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ long start = fileSplit.getStart();
+ Assert.assertEquals(lastEnd, start);
+ lastEnd = start + fileSplit.getLength();
+ }
+
+ //Verify there is nothing more to read from the input file
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
+ SequenceFile.Reader.file(listFile));
+
+ try {
+ reader.seek(lastEnd);
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ private void checkAgainstLegacy(List<InputSplit> splits,
+ List<InputSplit> legacySplits)
+ throws IOException, InterruptedException {
+
+ Assert.assertEquals(legacySplits.size(), splits.size());
+ for (int index = 0; index < splits.size(); index++) {
+ FileSplit fileSplit = (FileSplit) splits.get(index);
+ FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
+ Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
+ }
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ testGetSplits(9);
+ for (int i=1; i<N_FILES; ++i)
+ testGetSplits(i);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.security.Credentials;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDynamicInputFormat {
+ private static final Log LOG = LogFactory.getLog(TestDynamicInputFormat.class);
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 1000;
+ private static final int NUM_SPLITS = 7;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static List<String> expectedFilePaths = new ArrayList<String>(N_FILES);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(getConfigurationForCluster())
+ .numDataNodes(1).format(true).build();
+
+ for (int i=0; i<N_FILES; ++i)
+ createFile("/tmp/source/" + String.valueOf(i));
+
+ }
+
+ private static Configuration getConfigurationForCluster() {
+ Configuration configuration = new Configuration();
+ System.setProperty("test.build.data",
+ "target/tmp/build/TEST_DYNAMIC_INPUT_FORMAT/data");
+ configuration.set("hadoop.log.dir", "target/tmp");
+ LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
+ LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
+ return configuration;
+ }
+
+ private static DistCpOptions getOptions() throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ DistCpOptions options = new DistCpOptions(sourceList, targetPath);
+ options.setMaxMaps(NUM_SPLITS);
+ return options;
+ }
+
+ private static void createFile(String path) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ expectedFilePaths.add(fileSystem.listStatus(
+ new Path(path))[0].getPath().toString());
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testGetSplits() throws Exception {
+ DistCpOptions options = getOptions();
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
+ String.valueOf(options.getMaxMaps()));
+ CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
+ new Path(cluster.getFileSystem().getUri().toString()
+ +"/tmp/testDynInputFormat/fileList.seq"), options);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ DynamicInputFormat<Text, FileStatus> inputFormat =
+ new DynamicInputFormat<Text, FileStatus>();
+ List<InputSplit> splits = inputFormat.getSplits(jobContext);
+
+ int nFiles = 0;
+ int taskId = 0;
+
+ for (InputSplit split : splits) {
+ RecordReader<Text, FileStatus> recordReader =
+ inputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, taskId);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+
+ recordReader.initialize(splits.get(0), taskAttemptContext);
+ float previousProgressValue = 0f;
+ while (recordReader.nextKeyValue()) {
+ FileStatus fileStatus = recordReader.getCurrentValue();
+ String source = fileStatus.getPath().toString();
+ System.out.println(source);
+ Assert.assertTrue(expectedFilePaths.contains(source));
+ final float progress = recordReader.getProgress();
+ Assert.assertTrue(progress >= previousProgressValue);
+ Assert.assertTrue(progress >= 0.0f);
+ Assert.assertTrue(progress <= 1.0f);
+ previousProgressValue = progress;
+ ++nFiles;
+ }
+ Assert.assertTrue(recordReader.getProgress() == 1.0f);
+
+ ++taskId;
+ }
+
+ Assert.assertEquals(expectedFilePaths.size(), nFiles);
+ }
+
+ @Test
+ public void testGetSplitRatio() throws Exception {
+ Assert.assertEquals(1, DynamicInputFormat.getSplitRatio(1, 1000000000));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
+ Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.Stack;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestDistCpUtils {
+ private static final Log LOG = LogFactory.getLog(TestDistCpUtils.class);
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetRelativePathRoot() {
+ Path root = new Path("/tmp/abc");
+ Path child = new Path("/tmp/abc/xyz/file");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/xyz/file");
+
+ root = new Path("/");
+ child = new Path("/a");
+ Assert.assertEquals(DistCpUtils.getRelativePath(root, child), "/a");
+ }
+
+ @Test
+ public void testPackAttributes() {
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "");
+
+ attributes.add(FileAttribute.REPLICATION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "R");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("R"));
+
+ attributes.add(FileAttribute.BLOCKSIZE);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RB");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RB"));
+
+ attributes.add(FileAttribute.USER);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBU");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBU"));
+
+ attributes.add(FileAttribute.GROUP);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUG");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUG"));
+
+ attributes.add(FileAttribute.PERMISSION);
+ Assert.assertEquals(DistCpUtils.packAttributes(attributes), "RBUGP");
+ Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RBUGP"));
+ }
+
+ @Test
+ public void testPreserve() {
+ try {
+ FileSystem fs = FileSystem.get(config);
+ EnumSet<FileAttribute> attributes = EnumSet.noneOf(FileAttribute.class);
+
+
+ Path path = new Path("/tmp/abc");
+ Path src = new Path("/tmp/src");
+ fs.mkdirs(path);
+ fs.mkdirs(src);
+ FileStatus srcStatus = fs.getFileStatus(src);
+
+ FsPermission noPerm = new FsPermission((short) 0);
+ fs.setPermission(path, noPerm);
+ fs.setOwner(path, "nobody", "nobody");
+
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ FileStatus target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), noPerm);
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.PERMISSION);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), "nobody");
+ Assert.assertEquals(target.getGroup(), "nobody");
+
+ attributes.add(FileAttribute.GROUP);
+ attributes.add(FileAttribute.USER);
+ DistCpUtils.preserve(fs, path, srcStatus, attributes);
+ target = fs.getFileStatus(path);
+ Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
+ Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
+ Assert.assertEquals(target.getGroup(), srcStatus.getGroup());
+
+ fs.delete(path, true);
+ fs.delete(src, true);
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Preserve test failure");
+ }
+ }
+
+ private static Random rand = new Random();
+
+ public static String createTestSetup(FileSystem fs) throws IOException {
+ return createTestSetup("/tmp1", fs, FsPermission.getDefault());
+ }
+
+ public static String createTestSetup(FileSystem fs,
+ FsPermission perm) throws IOException {
+ return createTestSetup("/tmp1", fs, perm);
+ }
+
+ public static String createTestSetup(String baseDir,
+ FileSystem fs,
+ FsPermission perm) throws IOException {
+ String base = getBase(baseDir);
+ fs.mkdirs(new Path(base + "/newTest/hello/world1"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world2/newworld"));
+ fs.mkdirs(new Path(base + "/newTest/hello/world3/oldworld"));
+ fs.setPermission(new Path(base + "/newTest"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world1"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world2/newworld"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3"), perm);
+ fs.setPermission(new Path(base + "/newTest/hello/world3/oldworld"), perm);
+ createFile(fs, base + "/newTest/1");
+ createFile(fs, base + "/newTest/hello/2");
+ createFile(fs, base + "/newTest/hello/world3/oldworld/3");
+ createFile(fs, base + "/newTest/hello/world2/4");
+ return base;
+ }
+
+ private static String getBase(String base) {
+ String location = String.valueOf(rand.nextLong());
+ return base + "/" + location;
+ }
+
+ public static void delete(FileSystem fs, String path) {
+ try {
+ if (fs != null) {
+ if (path != null) {
+ fs.delete(new Path(path), true);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception encountered ", e);
+ }
+ }
+
+ public static void createFile(FileSystem fs, String filePath) throws IOException {
+ OutputStream out = fs.create(new Path(filePath));
+ IOUtils.closeStream(out);
+ }
+
+ public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
+ throws IOException {
+ Path base = new Path(targetBase);
+
+ Stack<Path> stack = new Stack<Path>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) continue;
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) continue;
+
+ for (FileStatus status : fStatus) {
+ if (status.isDirectory()) {
+ stack.push(status.getPath());
+ }
+ Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
+ DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
+ }
+ }
+ return true;
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestRetriableCommand {
+
+ private static class MyRetriableCommand extends RetriableCommand {
+
+ private int succeedAfter;
+ private int retryCount = 0;
+
+ public MyRetriableCommand(int succeedAfter) {
+ super("MyRetriableCommand");
+ this.succeedAfter = succeedAfter;
+ }
+
+ public MyRetriableCommand(int succeedAfter, RetryPolicy retryPolicy) {
+ super("MyRetriableCommand", retryPolicy);
+ this.succeedAfter = succeedAfter;
+ }
+
+ @Override
+ protected Object doExecute(Object... arguments) throws Exception {
+ if (++retryCount < succeedAfter)
+ throw new Exception("Transient failure#" + retryCount);
+ return 0;
+ }
+ }
+
+ @Test
+ public void testRetriableCommand() {
+ try {
+ new MyRetriableCommand(5).execute(0);
+ Assert.assertTrue(false);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+
+ try {
+ new MyRetriableCommand(3).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+
+ try {
+ new MyRetriableCommand(5, RetryPolicies.
+ retryUpToMaximumCountWithFixedSleep(5, 0, TimeUnit.MILLISECONDS)).execute(0);
+ Assert.assertTrue(true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+
+public class TestThrottledInputStream {
+ private static final Log LOG = LogFactory.getLog(TestThrottledInputStream.class);
+ private static final int BUFF_SIZE = 1024;
+
+ private enum CB {ONE_C, BUFFER, BUFF_OFFSET}
+
+ @Test
+ public void testRead() {
+ File tmpFile;
+ File outFile;
+ try {
+ tmpFile = createFile(1024);
+ outFile = createFile();
+
+ tmpFile.deleteOnExit();
+ outFile.deleteOnExit();
+
+ long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFFER);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFFER);
+*/
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFF_OFFSET);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFF_OFFSET);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFF_OFFSET);
+*/
+
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.ONE_C);
+/*
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.ONE_C);
+ copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.ONE_C);
+*/
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ }
+ }
+
+ private long copyAndAssert(File tmpFile, File outFile,
+ long maxBandwidth, float factor,
+ int sleepTime, CB flag) throws IOException {
+ long bandwidth;
+ ThrottledInputStream in;
+ long maxBPS = (long) (maxBandwidth / factor);
+
+ if (maxBandwidth == 0) {
+ in = new ThrottledInputStream(new FileInputStream(tmpFile));
+ } else {
+ in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+ }
+ OutputStream out = new FileOutputStream(outFile);
+ try {
+ if (flag == CB.BUFFER) {
+ copyBytes(in, out, BUFF_SIZE);
+ } else if (flag == CB.BUFF_OFFSET){
+ copyBytesWithOffset(in, out, BUFF_SIZE);
+ } else {
+ copyByteByByte(in, out);
+ }
+
+ LOG.info(in);
+ bandwidth = in.getBytesPerSec();
+ Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+ Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
+ Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS);
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
+ return bandwidth;
+ }
+
+ private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf, 0, buffSize);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ bytesRead = in.read(buf);
+ }
+ }
+
+ private static void copyByteByByte(InputStream in, OutputStream out)
+ throws IOException {
+
+ int ch = in.read();
+ while (ch >= 0) {
+ out.write(ch);
+ ch = in.read();
+ }
+ }
+
+ private static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ bytesRead = in.read(buf);
+ }
+ }
+
+ private File createFile(long sizeInKB) throws IOException {
+ File tmpFile = createFile();
+ writeToFile(tmpFile, sizeInKB);
+ return tmpFile;
+ }
+
+ private File createFile() throws IOException {
+ return File.createTempFile("tmp", "dat");
+ }
+
+ private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
+ OutputStream out = new FileOutputStream(tmpFile);
+ try {
+ byte[] buffer = new byte [1024];
+ for (long index = 0; index < sizeInKB; index++) {
+ out.write(buffer);
+ }
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+ <name>ssl.client.truststore.location</name>
+ <value>/path/to/truststore/keys/keystore.jks</value>
+ <description>Truststore to be used by clients like distcp. Must be
+ specified.
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.truststore.password</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.truststore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.location</name>
+ <value>/path/to/keystore/keys/keystore.jks</value>
+ <description>Keystore to be used by clients like distcp. Must be
+ specified.
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.password</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.keypassword</name>
+ <value>changeit</value>
+ <description>Optional. Default value is "".
+ </description>
+</property>
+
+<property>
+ <name>ssl.client.keystore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+</property>
+
+</configuration>
Modified: hadoop/common/trunk/hadoop-tools/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/pom.xml?rev=1236045&r1=1236044&r2=1236045&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/pom.xml (original)
+++ hadoop/common/trunk/hadoop-tools/pom.xml Thu Jan 26 06:36:52 2012
@@ -29,6 +29,7 @@
<modules>
<module>hadoop-streaming</module>
+ <module>hadoop-distcp</module>
<module>hadoop-archives</module>
<module>hadoop-rumen</module>
<module>hadoop-tools-dist</module>