You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:07 UTC
svn commit: r1619012 [2/3] - in /hadoop/common/branches/HADOOP-10388: ./
hadoop-assemblies/src/main/resources/assemblies/ hadoop-client/
hadoop-dist/ hadoop-maven-plugins/
hadoop-maven-plugins/src/main/java/org/apache/hadoop/maven/plugin/protoc/
hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -44,7 +44,8 @@ import java.util.ArrayList;
* that the total-number of bytes to be copied for each input split is
* uniform.
*/
-public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
+public class UniformSizeInputFormat
+ extends InputFormat<Text, CopyListingFileStatus> {
private static final Log LOG
= LogFactory.getLog(UniformSizeInputFormat.class);
@@ -76,7 +77,7 @@ public class UniformSizeInputFormat exte
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
- FileStatus srcFileStatus = new FileStatus();
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
long currentSplitSize = 0;
long lastSplitStart = 0;
@@ -161,9 +162,9 @@ public class UniformSizeInputFormat exte
* @throws InterruptedException
*/
@Override
- public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SequenceFileRecordReader<Text, FileStatus>();
+ public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
@@ -90,7 +91,7 @@ class DynamicInputChunk<K, V> {
private void openForWrite() throws IOException {
writer = SequenceFile.createWriter(
chunkFilePath.getFileSystem(configuration), configuration,
- chunkFilePath, Text.class, FileStatus.class,
+ chunkFilePath, Text.class, CopyListingFileStatus.class,
SequenceFile.CompressionType.NONE);
}
@@ -117,7 +118,7 @@ class DynamicInputChunk<K, V> {
* @param value Corresponding value from the listing file.
* @throws IOException Exception onf failure to write to the file.
*/
- public void write(Text key, FileStatus value) throws IOException {
+ public void write(Text key, CopyListingFileStatus value) throws IOException {
writer.append(key, value);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java Tue Aug 19 23:49:39 2014
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import java.util.List;
import java.util.ArrayList;
@@ -57,7 +57,7 @@ public class DynamicInputFormat<K, V> ex
= "mapred.num.splits";
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
= "mapred.num.entries.per.chunk";
-
+
/**
* Implementation of InputFormat::getSplits(). This method splits up the
* copy-listing file into chunks, and assigns the first batch to different
@@ -91,7 +91,7 @@ public class DynamicInputFormat<K, V> ex
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
- MIN_RECORDS_PER_CHUNK,
+ getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
@@ -107,9 +107,11 @@ public class DynamicInputFormat<K, V> ex
final Configuration configuration = context.getConfiguration();
int numRecords = getNumberOfRecords(configuration);
int numMaps = getNumMapTasks(configuration);
+ int maxChunksTolerable = getMaxChunksTolerable(configuration);
+
// Number of chunks each map will process, on average.
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
- validateNumChunksUsing(splitRatio, numMaps);
+ validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
/(splitRatio * numMaps));
@@ -131,7 +133,7 @@ public class DynamicInputFormat<K, V> ex
List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
- FileStatus fileStatus = new FileStatus();
+ CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relPath = new Text();
int recordCounter = 0;
int chunkCount = 0;
@@ -168,9 +170,9 @@ public class DynamicInputFormat<K, V> ex
return chunksFinal;
}
- private static void validateNumChunksUsing(int splitRatio, int numMaps)
- throws IOException {
- if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+ private static void validateNumChunksUsing(int splitRatio, int numMaps,
+ int maxChunksTolerable) throws IOException {
+ if (splitRatio * numMaps > maxChunksTolerable)
throw new IOException("Too many chunks created with splitRatio:"
+ splitRatio + ", numMaps:" + numMaps
+ ". Reduce numMaps or decrease split-ratio to proceed.");
@@ -238,14 +240,61 @@ public class DynamicInputFormat<K, V> ex
int numMaps, int numPaths) {
return configuration.getInt(
CONF_LABEL_LISTING_SPLIT_RATIO,
- getSplitRatio(numMaps, numPaths));
+ getSplitRatio(numMaps, numPaths, configuration));
+ }
+
+ private static int getMaxChunksTolerable(Configuration conf) {
+ int maxChunksTolerable = conf.getInt(
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE,
+ DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+ if (maxChunksTolerable <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+ maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT;
+ }
+ return maxChunksTolerable;
+ }
+
+ private static int getMaxChunksIdeal(Configuration conf) {
+ int maxChunksIdeal = conf.getInt(
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL,
+ DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+ if (maxChunksIdeal <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+ maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT;
+ }
+ return maxChunksIdeal;
+ }
+
+ private static int getMinRecordsPerChunk(Configuration conf) {
+ int minRecordsPerChunk = conf.getInt(
+ DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK,
+ DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+ if (minRecordsPerChunk <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+ minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT;
+ }
+ return minRecordsPerChunk;
}
- private static final int MAX_CHUNKS_TOLERABLE = 400;
- private static final int MAX_CHUNKS_IDEAL = 100;
- private static final int MIN_RECORDS_PER_CHUNK = 5;
- private static final int SPLIT_RATIO_DEFAULT = 2;
-
+ private static int getSplitRatio(Configuration conf) {
+ int splitRatio = conf.getInt(
+ DistCpConstants.CONF_LABEL_SPLIT_RATIO,
+ DistCpConstants.SPLIT_RATIO_DEFAULT);
+ if (splitRatio <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.SPLIT_RATIO_DEFAULT);
+ splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT;
+ }
+ return splitRatio;
+ }
+
/**
* Package private, for testability.
* @param nMaps The number of maps requested for.
@@ -253,19 +302,34 @@ public class DynamicInputFormat<K, V> ex
* @return The number of splits each map should handle, ideally.
*/
static int getSplitRatio(int nMaps, int nRecords) {
+ return getSplitRatio(nMaps, nRecords,new Configuration());
+ }
+
+ /**
+ * Package private, for testability.
+ * @param nMaps The number of maps requested for.
+ * @param nRecords The number of records to be copied.
+ * @param conf The configuration set by users.
+ * @return The number of splits each map should handle, ideally.
+ */
+ static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
+ int maxChunksIdeal = getMaxChunksIdeal(conf);
+ int minRecordsPerChunk = getMinRecordsPerChunk(conf);
+ int splitRatio = getSplitRatio(conf);
+
if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}
- if (nMaps > MAX_CHUNKS_IDEAL)
- return SPLIT_RATIO_DEFAULT;
+ if (nMaps > maxChunksIdeal)
+ return splitRatio;
- int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+ int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
- return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
- SPLIT_RATIO_DEFAULT : nPickups;
+ return nRecordsPerChunk < minRecordsPerChunk ?
+ splitRatio : nPickups;
}
static int getNumEntriesPerChunk(Configuration configuration) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java Tue Aug 19 23:49:39 2014
@@ -25,16 +25,26 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
+import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.mapreduce.InputFormat;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
import java.text.DecimalFormat;
import java.net.URI;
import java.net.InetAddress;
@@ -181,7 +191,7 @@ public class DistCpUtils {
* change or any transient error)
*/
public static void preserve(FileSystem targetFS, Path path,
- FileStatus srcFileStatus,
+ CopyListingFileStatus srcFileStatus,
EnumSet<FileAttribute> attributes) throws IOException {
FileStatus targetFileStatus = targetFS.getFileStatus(path);
@@ -189,10 +199,33 @@ public class DistCpUtils {
String user = targetFileStatus.getOwner();
boolean chown = false;
- if (attributes.contains(FileAttribute.PERMISSION) &&
+ if (attributes.contains(FileAttribute.ACL)) {
+ List<AclEntry> srcAcl = srcFileStatus.getAclEntries();
+ List<AclEntry> targetAcl = getAcl(targetFS, targetFileStatus);
+ if (!srcAcl.equals(targetAcl)) {
+ targetFS.setAcl(path, srcAcl);
+ }
+ // setAcl can't preserve sticky bit, so also call setPermission if needed.
+ if (srcFileStatus.getPermission().getStickyBit() !=
+ targetFileStatus.getPermission().getStickyBit()) {
+ targetFS.setPermission(path, srcFileStatus.getPermission());
+ }
+ } else if (attributes.contains(FileAttribute.PERMISSION) &&
!srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
targetFS.setPermission(path, srcFileStatus.getPermission());
}
+
+ if (attributes.contains(FileAttribute.XATTR)) {
+ Map<String, byte[]> srcXAttrs = srcFileStatus.getXAttrs();
+ Map<String, byte[]> targetXAttrs = getXAttrs(targetFS, path);
+ if (!srcXAttrs.equals(targetXAttrs)) {
+ Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String, byte[]> entry = iter.next();
+ targetFS.setXAttr(path, entry.getKey(), entry.getValue());
+ }
+ }
+ }
if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() &&
srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
@@ -217,6 +250,65 @@ public class DistCpUtils {
}
/**
+ * Returns a file's full logical ACL.
+ *
+ * @param fileSystem FileSystem containing the file
+ * @param fileStatus FileStatus of file
+ * @return List<AclEntry> containing full logical ACL
+ * @throws IOException if there is an I/O error
+ */
+ public static List<AclEntry> getAcl(FileSystem fileSystem,
+ FileStatus fileStatus) throws IOException {
+ List<AclEntry> entries = fileSystem.getAclStatus(fileStatus.getPath())
+ .getEntries();
+ return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries);
+ }
+
+ /**
+ * Returns a file's all xAttrs.
+ *
+ * @param fileSystem FileSystem containing the file
+ * @param path file path
+ * @return Map<String, byte[]> containing all xAttrs
+ * @throws IOException if there is an I/O error
+ */
+ public static Map<String, byte[]> getXAttrs(FileSystem fileSystem,
+ Path path) throws IOException {
+ return fileSystem.getXAttrs(path);
+ }
+
+ /**
+ * Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs,
+ * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
+ * populates the CopyListingFileStatus with the XAttrs.
+ *
+ * @param fileSystem FileSystem containing the file
+ * @param fileStatus FileStatus of file
+ * @param preserveAcls boolean true if preserving ACLs
+ * @param preserveXAttrs boolean true if preserving XAttrs
+ * @throws IOException if there is an I/O error
+ */
+ public static CopyListingFileStatus toCopyListingFileStatus(
+ FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
+ boolean preserveXAttrs) throws IOException {
+ CopyListingFileStatus copyListingFileStatus =
+ new CopyListingFileStatus(fileStatus);
+ if (preserveAcls) {
+ FsPermission perm = fileStatus.getPermission();
+ if (perm.getAclBit()) {
+ List<AclEntry> aclEntries = fileSystem.getAclStatus(
+ fileStatus.getPath()).getEntries();
+ copyListingFileStatus.setAclEntries(aclEntries);
+ }
+ }
+ if (preserveXAttrs) {
+ Map<String, byte[]> xAttrs = fileSystem.getXAttrs(fileStatus.getPath());
+ copyListingFileStatus.setXAttrs(xAttrs);
+ }
+ return copyListingFileStatus;
+ }
+
+ /**
* Sort sequence file containing FileStatus and Text as key and value respecitvely
*
* @param fs - File System
@@ -227,7 +319,8 @@ public class DistCpUtils {
*/
public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
throws IOException {
- SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
+ CopyListingFileStatus.class, conf);
Path output = new Path(sourceListing.toString() + "_sorted");
if (fs.exists(output)) {
@@ -239,6 +332,44 @@ public class DistCpUtils {
}
/**
+ * Determines if a file system supports ACLs by running a canary getAclStatus
+ * request on the file system root. This method is used before distcp job
+ * submission to fail fast if the user requested preserving ACLs, but the file
+ * system cannot support ACLs.
+ *
+ * @param fs FileSystem to check
+ * @throws AclsNotSupportedException if fs does not support ACLs
+ */
+ public static void checkFileSystemAclSupport(FileSystem fs)
+ throws AclsNotSupportedException {
+ try {
+ fs.getAclStatus(new Path(Path.SEPARATOR));
+ } catch (Exception e) {
+ throw new AclsNotSupportedException("ACLs not supported for file system: "
+ + fs.getUri());
+ }
+ }
+
+ /**
+ * Determines if a file system supports XAttrs by running a getXAttrs request
+ * on the file system root. This method is used before distcp job submission
+ * to fail fast if the user requested preserving XAttrs, but the file system
+ * cannot support XAttrs.
+ *
+ * @param fs FileSystem to check
+ * @throws XAttrsNotSupportedException if fs does not support XAttrs
+ */
+ public static void checkFileSystemXAttrSupport(FileSystem fs)
+ throws XAttrsNotSupportedException {
+ try {
+ fs.getXAttrs(new Path(Path.SEPARATOR));
+ } catch (Exception e) {
+ throw new XAttrsNotSupportedException("XAttrs not supported for file system: "
+ + fs.getUri());
+ }
+ }
+
+ /**
* String utility to convert a number-of-bytes to human readable format.
*/
private static ThreadLocal<DecimalFormat> FORMATTER
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+
+import com.google.common.base.Preconditions;
+
/**
* The ThrottleInputStream provides bandwidth throttling on a specified
* InputStream. It is implemented as a wrapper on top of another InputStream
@@ -90,6 +95,25 @@ public class ThrottledInputStream extend
return readLen;
}
+ /**
+ * Read bytes starting from the specified position. This requires rawStream is
+ * an instance of {@link PositionedReadable}.
+ */
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ if (!(rawStream instanceof PositionedReadable)) {
+ throw new UnsupportedOperationException(
+ "positioned read is not supported by the internal stream");
+ }
+ throttle();
+ int readLen = ((PositionedReadable) rawStream).read(position, buffer,
+ offset, length);
+ if (readLen != -1) {
+ bytesRead += readLen;
+ }
+ return readLen;
+ }
+
private void throttle() throws IOException {
if (getBytesPerSec() > maxBytesPerSec) {
try {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml Tue Aug 19 23:49:39 2014
@@ -1,8 +1,6 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
- Copyright 2002-2004 The Apache Software Foundation
-
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
@@ -33,18 +32,19 @@ import java.io.IOException;
public class StubContext {
private StubStatusReporter reporter = new StubStatusReporter();
- private RecordReader<Text, FileStatus> reader;
+ private RecordReader<Text, CopyListingFileStatus> reader;
private StubInMemoryWriter writer = new StubInMemoryWriter();
- private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+ private Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapperContext;
- public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
- int taskId) throws IOException, InterruptedException {
+ public StubContext(Configuration conf,
+ RecordReader<Text, CopyListingFileStatus> reader, int taskId)
+ throws IOException, InterruptedException {
- WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
- = new WrappedMapper<Text, FileStatus, Text, Text>();
+ WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
+ = new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
- MapContextImpl<Text, FileStatus, Text, Text> contextImpl
- = new MapContextImpl<Text, FileStatus, Text, Text>(conf,
+ MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
+ = new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
getTaskAttemptID(taskId), reader, writer,
null, reporter, null);
@@ -52,7 +52,7 @@ public class StubContext {
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
}
- public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+ public Mapper<Text, CopyListingFileStatus, Text, Text>.Context getContext() {
return mapperContext;
}
@@ -60,7 +60,7 @@ public class StubContext {
return reporter;
}
- public RecordReader<Text, FileStatus> getReader() {
+ public RecordReader<Text, CopyListingFileStatus> getReader() {
return reader;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.util.TestDistCpUtils;
@@ -106,7 +105,7 @@ public class TestCopyListing extends Sim
Assert.assertEquals(listing.getNumberOfPaths(), 3);
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
SequenceFile.Reader.file(listingFile));
- FileStatus fileStatus = new FileStatus();
+ CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1");
@@ -274,7 +273,7 @@ public class TestCopyListing extends Sim
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
- FileStatus fileStatus = new FileStatus();
+ CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertTrue(relativePath.toString().equals(""));
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -531,7 +530,7 @@ public class TestFileBasedCopyListing {
SequenceFile.Reader.file(listFile));
try {
Text relPath = new Text();
- FileStatus fileStatus = new FileStatus();
+ CopyListingFileStatus fileStatus = new CopyListingFileStatus();
while (reader.next(relPath, fileStatus)) {
if (fileStatus.isDirectory() && relPath.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java Tue Aug 19 23:49:39 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.tools;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -121,7 +120,7 @@ public class TestGlobbedCopyListing {
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
listingPath, new Configuration());
Text key = new Text();
- FileStatus value = new FileStatus();
+ CopyListingFileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Tue Aug 19 23:49:39 2014
@@ -18,9 +18,12 @@
package org.apache.hadoop.tools;
+import static org.junit.Assert.fail;
+
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.DistCpOptions.*;
import org.apache.hadoop.conf.Configuration;
@@ -410,6 +413,8 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
"-p",
@@ -421,6 +426,8 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
"-pbr",
@@ -433,6 +440,8 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
"-pbrgup",
@@ -445,9 +454,11 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
- "-pbrgupc",
+ "-pbrgupcax",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
@@ -457,6 +468,8 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
"-pc",
@@ -469,6 +482,8 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
options = OptionsParser.parse(new String[] {
"-p",
@@ -485,7 +500,7 @@ public class TestOptionsParser {
try {
OptionsParser.parse(new String[] {
- "-pabc",
+ "-pabcd",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target"});
@@ -548,4 +563,45 @@ public class TestOptionsParser {
Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
}
+
+ @Test
+ public void testAppendOption() {
+ Configuration conf = new Configuration();
+ Assert.assertFalse(conf.getBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+ Assert.assertFalse(conf.getBoolean(
+ DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+ DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+ "-append", "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ options.appendToConf(conf);
+ Assert.assertTrue(conf.getBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+ Assert.assertTrue(conf.getBoolean(
+ DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+ // make sure -append is only valid when -update is specified
+ try {
+ options = OptionsParser.parse(new String[] { "-append",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ fail("Append should fail if update option is not specified");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Append is valid only with update options", e);
+ }
+
+ // make sure -append is invalid when skipCrc is specified
+ try {
+ options = OptionsParser.parse(new String[] {
+ "-append", "-update", "-skipcrccheck",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ fail("Append should fail if skipCrc option is specified");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Append is disallowed when skipping CRC", e);
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Tue Aug 19 23:49:39 2014
@@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -42,6 +44,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
@@ -117,6 +120,16 @@ public class TestCopyMapper {
touchFile(SOURCE_PATH + "/7/8/9");
}
+ private static void appendSourceData() throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ for (Path source : pathList) {
+ if (fs.getFileStatus(source).isFile()) {
+ // append 2048 bytes per file
+ appendFile(source, DEFAULT_FILE_SIZE * 2);
+ }
+ }
+ }
+
private static void createSourceDataWithDifferentBlockSize() throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
@@ -200,95 +213,141 @@ public class TestCopyMapper {
}
}
+ /**
+ * Append specified length of bytes to a given file
+ */
+ private static void appendFile(Path p, int length) throws IOException {
+ byte[] toAppend = new byte[length];
+ Random random = new Random();
+ random.nextBytes(toAppend);
+ FSDataOutputStream out = cluster.getFileSystem().append(p);
+ try {
+ out.write(toAppend);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
@Test
public void testCopyWithDifferentChecksumType() throws Exception {
testCopy(true);
}
@Test(timeout=40000)
- public void testRun() {
+ public void testRun() throws Exception {
testCopy(false);
}
- private void testCopy(boolean preserveChecksum) {
- try {
- deleteState();
- if (preserveChecksum) {
- createSourceDataWithDifferentChecksumType();
- } else {
- createSourceData();
- }
+ @Test
+ public void testCopyWithAppend() throws Exception {
+ final FileSystem fs = cluster.getFileSystem();
+ // do the first distcp
+ testCopy(false);
+ // start appending data to source
+ appendSourceData();
- FileSystem fs = cluster.getFileSystem();
- CopyMapper copyMapper = new CopyMapper();
- StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
- = stubContext.getContext();
+ // do the distcp again with -update and -append option
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+ stubContext.getContext();
+ // Enable append
+ context.getConfiguration().setBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+ copyMapper.setup(context);
+ for (Path path: pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
+ path)), context);
+ }
+
+ verifyCopy(fs, false);
+ // verify that we only copied new appended data
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ Assert.assertEquals(pathList.size(), stubContext.getReporter().
+ getCounter(CopyMapper.Counter.COPY).getValue());
+ }
+
+ private void testCopy(boolean preserveChecksum) throws Exception {
+ deleteState();
+ if (preserveChecksum) {
+ createSourceDataWithDifferentChecksumType();
+ } else {
+ createSourceData();
+ }
- Configuration configuration = context.getConfiguration();
- EnumSet<DistCpOptions.FileAttribute> fileAttributes
- = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
- if (preserveChecksum) {
- fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
- }
- configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
- DistCpUtils.packAttributes(fileAttributes));
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+ if (preserveChecksum) {
+ fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
- copyMapper.setup(context);
+ copyMapper.setup(context);
- for (Path path: pathList) {
- copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fs.getFileStatus(path), context);
- }
+ for (Path path: pathList) {
+ copyMapper.map(
+ new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ new CopyListingFileStatus(fs.getFileStatus(path)), context);
+ }
- // Check that the maps worked.
- for (Path path : pathList) {
- final Path targetPath = new Path(path.toString()
- .replaceAll(SOURCE_PATH, TARGET_PATH));
- Assert.assertTrue(fs.exists(targetPath));
- Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
- FileStatus sourceStatus = fs.getFileStatus(path);
- FileStatus targetStatus = fs.getFileStatus(targetPath);
- Assert.assertEquals(sourceStatus.getReplication(),
- targetStatus.getReplication());
- if (preserveChecksum) {
- Assert.assertEquals(sourceStatus.getBlockSize(),
- targetStatus.getBlockSize());
- }
- Assert.assertTrue(!fs.isFile(targetPath)
- || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
- }
-
- Assert.assertEquals(pathList.size(),
- stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
- if (!preserveChecksum) {
- Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
- .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
- .getValue());
- } else {
- Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
- .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
- .getValue());
- }
-
- testCopyingExistingFiles(fs, copyMapper, context);
- for (Text value : stubContext.getWriter().values()) {
- Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
- }
+ // Check that the maps worked.
+ verifyCopy(fs, preserveChecksum);
+ Assert.assertEquals(pathList.size(), stubContext.getReporter()
+ .getCounter(CopyMapper.Counter.COPY).getValue());
+ if (!preserveChecksum) {
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ } else {
+ Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
}
- catch (Exception e) {
- LOG.error("Unexpected exception: ", e);
- Assert.assertTrue(false);
+
+ testCopyingExistingFiles(fs, copyMapper, context);
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value
+ .toString().startsWith("SKIP:"));
}
}
- private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
- Mapper<Text, FileStatus, Text, Text>.Context context) {
+ private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+ throws Exception {
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
+ TARGET_PATH));
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+ FileStatus sourceStatus = fs.getFileStatus(path);
+ FileStatus targetStatus = fs.getFileStatus(targetPath);
+ Assert.assertEquals(sourceStatus.getReplication(),
+ targetStatus.getReplication());
+ if (preserveChecksum) {
+ Assert.assertEquals(sourceStatus.getBlockSize(),
+ targetStatus.getBlockSize());
+ }
+ Assert.assertTrue(!fs.isFile(targetPath)
+ || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
+ }
+ }
+ private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fs.getFileStatus(path), context);
+ new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
Assert.assertEquals(nFiles,
@@ -309,7 +368,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@@ -320,7 +379,7 @@ public class TestCopyMapper {
copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
- fs.getFileStatus(pathList.get(0)), context);
+ new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
Assert.assertTrue("There should have been an exception.", false);
}
@@ -343,7 +402,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
mkdirs(SOURCE_PATH + "/src/file");
@@ -351,7 +410,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(fs.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@@ -372,22 +432,25 @@ public class TestCopyMapper {
final CopyMapper copyMapper = new CopyMapper();
- final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
- doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
- @Override
- public Mapper<Text, FileStatus, Text, Text>.Context run() {
- try {
- StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- return stubContext.getContext();
- } catch (Exception e) {
- LOG.error("Exception encountered ", e);
- throw new RuntimeException(e);
- }
- }
- });
+ final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+ tmpUser.doAs(
+ new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
+ preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+ preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
@@ -415,7 +478,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(tmpFS.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.fail("Expected copy to fail");
} catch (AccessControlException e) {
@@ -442,19 +506,20 @@ public class TestCopyMapper {
final CopyMapper copyMapper = new CopyMapper();
- final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
- doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
- @Override
- public Mapper<Text, FileStatus, Text, Text>.Context run() {
- try {
- StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- return stubContext.getContext();
- } catch (Exception e) {
- LOG.error("Exception encountered ", e);
- throw new RuntimeException(e);
- }
- }
- });
+ final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+ tmpUser.doAs(
+ new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
+ @Override
+ public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
+ try {
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ return stubContext.getContext();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
touchFile(SOURCE_PATH + "/src/file");
mkdirs(TARGET_PATH);
@@ -481,7 +546,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(tmpFS.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -518,9 +584,12 @@ public class TestCopyMapper {
}
});
- final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+ final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+ stubContext.getContext();
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
+ preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+ preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
@@ -551,7 +620,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(tmpFS.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.assertEquals(stubContext.getWriter().values().size(), 1);
Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
@@ -594,8 +664,10 @@ public class TestCopyMapper {
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
+ preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+ preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
- final Mapper<Text, FileStatus, Text, Text>.Context context
+ final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
@@ -629,7 +701,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(tmpFS.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.fail("Didn't expect the file to be copied");
} catch (AccessControlException ignore) {
@@ -661,7 +734,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
touchFile(SOURCE_PATH + "/src/file");
@@ -669,7 +742,8 @@ public class TestCopyMapper {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
- fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+ new CopyListingFileStatus(fs.getFileStatus(
+ new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@@ -688,7 +762,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@@ -705,7 +779,7 @@ public class TestCopyMapper {
if (!fileStatus.isDirectory()) {
fs.delete(path, true);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fileStatus, context);
+ new CopyListingFileStatus(fileStatus), context);
}
}
if (ignoreFailures) {
@@ -745,7 +819,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@@ -759,7 +833,7 @@ public class TestCopyMapper {
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fileStatus, context);
+ new CopyListingFileStatus(fileStatus), context);
}
Assert.fail("Copy should have failed because of block-size difference.");
@@ -780,7 +854,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@@ -798,7 +872,7 @@ public class TestCopyMapper {
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fileStatus, context);
+ new CopyListingFileStatus(fileStatus), context);
}
// Check that the block-size/replication aren't preserved.
@@ -855,7 +929,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(
@@ -863,7 +937,8 @@ public class TestCopyMapper {
targetFilePath.getParent().toString()); // Parent directory.
copyMapper.setup(context);
- final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+ final CopyListingFileStatus sourceFileStatus = new CopyListingFileStatus(
+ fs.getFileStatus(sourceFilePath));
long before = fs.getFileStatus(targetFilePath).getModificationTime();
copyMapper.map(new Text(DistCpUtils.getRelativePath(
@@ -907,7 +982,7 @@ public class TestCopyMapper {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, FileStatus, Text, Text>.Context context
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@@ -926,7 +1001,7 @@ public class TestCopyMapper {
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- fileStatus, context);
+ new CopyListingFileStatus(fileStatus), context);
}
// Check that the user/group attributes are preserved
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -48,8 +49,8 @@ public class TestRetriableFileCopyComman
Exception actualEx = null;
try {
- new RetriableFileCopyCommand("testFailOnCloseError")
- .copyBytes(stat, out, 512, context);
+ new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
+ .copyBytes(stat, 0, out, 512, context);
} catch (Exception e) {
actualEx = e;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.security.Credentials;
@@ -122,8 +123,8 @@ public class TestUniformSizeInputFormat
for (int i=0; i<splits.size(); ++i) {
InputSplit split = splits.get(i);
int currentSplitSize = 0;
- RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
- split, null);
+ RecordReader<Text, CopyListingFileStatus> recordReader =
+ uniformSizeInputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, 0);
final TaskAttemptContext taskAttemptContext
@@ -168,7 +169,7 @@ public class TestUniformSizeInputFormat
try {
reader.seek(lastEnd);
- FileStatus srcFileStatus = new FileStatus();
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java Tue Aug 19 23:49:39 2014
@@ -18,19 +18,20 @@
package org.apache.hadoop.tools.mapred.lib;
-import junit.framework.Assert;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.security.Credentials;
@@ -117,15 +118,15 @@ public class TestDynamicInputFormat {
+"/tmp/testDynInputFormat/fileList.seq"), options);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
- DynamicInputFormat<Text, FileStatus> inputFormat =
- new DynamicInputFormat<Text, FileStatus>();
+ DynamicInputFormat<Text, CopyListingFileStatus> inputFormat =
+ new DynamicInputFormat<Text, CopyListingFileStatus>();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
int nFiles = 0;
int taskId = 0;
for (InputSplit split : splits) {
- RecordReader<Text, FileStatus> recordReader =
+ RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, taskId);
@@ -135,7 +136,7 @@ public class TestDynamicInputFormat {
recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) {
- FileStatus fileStatus = recordReader.getCurrentValue();
+ CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
String source = fileStatus.getPath().toString();
System.out.println(source);
Assert.assertTrue(expectedFilePaths.contains(source));
@@ -160,5 +161,25 @@ public class TestDynamicInputFormat {
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+
+ // Tests with negative value configuration
+ Configuration conf = new Configuration();
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, -1);
+ Assert.assertEquals(1,
+ DynamicInputFormat.getSplitRatio(1, 1000000000, conf));
+ Assert.assertEquals(2,
+ DynamicInputFormat.getSplitRatio(11000000, 10, conf));
+ Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700, conf));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200, conf));
+
+ // Tests with valid configuration
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, 100);
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, 30);
+ conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, 10);
+ conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
+ Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
@@ -106,7 +107,8 @@ public class TestDistCpUtils {
Path src = new Path("/tmp/src");
fs.mkdirs(path);
fs.mkdirs(src);
- FileStatus srcStatus = fs.getFileStatus(src);
+ CopyListingFileStatus srcStatus = new CopyListingFileStatus(
+ fs.getFileStatus(src));
FsPermission noPerm = new FsPermission((short) 0);
fs.setPermission(path, noPerm);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml Tue Aug 19 23:49:39 2014
@@ -1,8 +1,6 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
- Copyright 2002-2004 The Apache Software Foundation
-
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java Tue Aug 19 23:49:39 2014
@@ -376,7 +376,7 @@ public class TestGridMixClasses {
/*
* test LoadSortComparator
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testLoadJobLoadSortComparator() throws Exception {
LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator();
@@ -409,7 +409,7 @@ public class TestGridMixClasses {
/*
* test SpecGroupingComparator
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testGridmixJobSpecGroupingComparator() throws Exception {
GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator();
@@ -452,7 +452,7 @@ public class TestGridMixClasses {
/*
* test CompareGridmixJob only equals and compare
*/
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testCompareGridmixJob() throws Exception {
Configuration conf = new Configuration();
Path outRoot = new Path("target");
@@ -478,7 +478,7 @@ public class TestGridMixClasses {
/*
* test ReadRecordFactory. should read all data from inputstream
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testReadRecordFactory() throws Exception {
// RecordFactory factory, InputStream src, Configuration conf
@@ -589,7 +589,7 @@ public class TestGridMixClasses {
/*
* test LoadRecordReader. It class reads data from some files.
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testLoadJobLoadRecordReader() throws Exception {
LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader();
Configuration conf = new Configuration();
@@ -652,7 +652,7 @@ public class TestGridMixClasses {
* test LoadReducer
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
LoadJob.LoadReducer test = new LoadJob.LoadReducer();
@@ -772,7 +772,7 @@ public class TestGridMixClasses {
/*
* test SerialJobFactory
*/
- @Test (timeout=40000)
+ @Test (timeout=120000)
public void testSerialReaderThread() throws Exception {
Configuration conf = new Configuration();
@@ -833,7 +833,7 @@ public class TestGridMixClasses {
* test SleepMapper
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- @Test (timeout=10000)
+ @Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
@@ -878,7 +878,7 @@ public class TestGridMixClasses {
/*
* test SleepReducer
*/
- @Test (timeout=1000)
+ @Test (timeout=3000)
public void testSleepReducer() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Tue Aug 19 23:49:39 2014
@@ -140,10 +140,10 @@ public class TestGridmixRecord {
final int chk = WritableComparator.compareBytes(
out1.getData(), 0, out1.getLength(),
out2.getData(), 0, out2.getLength());
- assertEquals(chk, x.compareTo(y));
- assertEquals(chk, cmp.compare(
+ assertEquals(Integer.signum(chk), Integer.signum(x.compareTo(y)));
+ assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(
out1.getData(), 0, out1.getLength(),
- out2.getData(), 0, out2.getLength()));
+ out2.getData(), 0, out2.getLength())));
// write second copy, compare eq
final int s1 = out1.getLength();
x.write(out1);
@@ -153,8 +153,8 @@ public class TestGridmixRecord {
y.write(out2);
assertEquals(0, cmp.compare(out2.getData(), 0, s2,
out2.getData(), s2, out2.getLength() - s2));
- assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
- out2.getData(), s2, out2.getLength() - s2));
+ assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(out1.getData(), 0, s1,
+ out2.getData(), s2, out2.getLength() - s2)));
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml Tue Aug 19 23:49:39 2014
@@ -146,6 +146,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java Tue Aug 19 23:49:39 2014
@@ -19,9 +19,11 @@
package org.apache.hadoop.fs.swift.snative;
import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.swift.exceptions.SwiftConnectionClosedException;
+import java.io.EOFException;
import java.io.IOException;
/**
@@ -37,10 +39,10 @@ public class StrictBufferedFSInputStream
@Override
public void seek(long pos) throws IOException {
if (pos < 0) {
- throw new IOException("Negative position");
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (in == null) {
- throw new SwiftConnectionClosedException("Stream closed");
+ throw new SwiftConnectionClosedException(FSExceptionMessages.STREAM_IS_CLOSED);
}
super.seek(pos);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java Tue Aug 19 23:49:39 2014
@@ -25,14 +25,14 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException;
-import org.apache.hadoop.fs.swift.exceptions.SwiftNotDirectoryException;
import org.apache.hadoop.fs.swift.exceptions.SwiftOperationFailedException;
-import org.apache.hadoop.fs.swift.exceptions.SwiftPathExistsException;
import org.apache.hadoop.fs.swift.exceptions.SwiftUnsupportedFeatureException;
import org.apache.hadoop.fs.swift.http.SwiftProtocolConstants;
import org.apache.hadoop.fs.swift.util.DurationStats;
@@ -373,7 +373,7 @@ public class SwiftNativeFileSystem exten
* @param directory path to query
* @return true iff the directory should be created
* @throws IOException IO problems
- * @throws SwiftNotDirectoryException if the path references a file
+ * @throws ParentNotDirectoryException if the path references a file
*/
private boolean shouldCreate(Path directory) throws IOException {
FileStatus fileStatus;
@@ -388,9 +388,9 @@ public class SwiftNativeFileSystem exten
if (!SwiftUtils.isDirectory(fileStatus)) {
//if it's a file, raise an error
- throw new SwiftNotDirectoryException(directory,
- String.format(": can't mkdir since it exists and is not a directory: %s",
- fileStatus));
+ throw new ParentNotDirectoryException(
+ String.format("%s: can't mkdir since it exists and is not a directory: %s",
+ directory, fileStatus));
} else {
//path exists, and it is a directory
if (LOG.isDebugEnabled()) {
@@ -488,7 +488,7 @@ public class SwiftNativeFileSystem exten
//overwrite set -> delete the object.
store.delete(absolutePath, true);
} else {
- throw new SwiftPathExistsException("Path exists: " + file);
+ throw new FileAlreadyExistsException("Path exists: " + file);
}
} else {
// destination does not exist -trigger creation of the parent
@@ -580,6 +580,9 @@ public class SwiftNativeFileSystem exten
} catch (SwiftOperationFailedException e) {
//downgrade to a failure
return false;
+ } catch (FileAlreadyExistsException e) {
+ //downgrade to a failure
+ return false;
} catch (FileNotFoundException e) {
//downgrade to a failure
return false;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import org.apache.commons.httpclient.Htt
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException;
@@ -590,7 +591,7 @@ public class SwiftNativeFileSystemStore
} else {
//outcome #1 dest it's a file: fail if differeent
if (!renamingOnToSelf) {
- throw new SwiftOperationFailedException(
+ throw new FileAlreadyExistsException(
"cannot rename a file over one that already exists");
} else {
//is mv self self where self is a file. this becomes a no-op
@@ -633,7 +634,7 @@ public class SwiftNativeFileSystemStore
if (destExists && !destIsDir) {
// #1 destination is a file: fail
- throw new SwiftOperationFailedException(
+ throw new FileAlreadyExistsException(
"the source is a directory, but not the destination");
}
Path targetPath;
@@ -927,7 +928,7 @@ public class SwiftNativeFileSystemStore
}
if (LOG.isDebugEnabled()) {
- SwiftUtils.debug(LOG, SwiftUtils.fileStatsToString(statuses, "\n"));
+ SwiftUtils.debug(LOG, "%s", SwiftUtils.fileStatsToString(statuses, "\n"));
}
if (filecount == 1 && swiftPath.equals(statuses[0].getPath())) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java Tue Aug 19 23:49:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.swift.snati
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -298,7 +299,8 @@ class SwiftNativeInputStream extends FSI
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) {
- throw new IOException("Negative Seek offset not supported");
+ throw new EOFException(
+ FSExceptionMessages.NEGATIVE_SEEK);
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent