You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/05/22 20:50:21 UTC
svn commit: r1596937 - in
/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src:
main/java/org/apache/hadoop/tools/ main/java/org/apache/hadoop/tools/mapred/
main/java/org/apache/hadoop/tools/util/ test/java/org/apache/hadoop/tools/
test/java...
Author: jing9
Date: Thu May 22 18:50:20 2014
New Revision: 1596937
URL: http://svn.apache.org/r1596937
Log:
MAPREDUCE-5899. Merge change r1596931 from trunk.
Modified:
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java Thu May 22 18:50:20 2014
@@ -50,6 +50,7 @@ public class DistCpConstants {
public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+ public static final String CONF_LABEL_APPEND = "distcp.copy.append";
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java Thu May 22 18:50:20 2014
@@ -138,6 +138,10 @@ public enum DistCpOptionSwitch {
new Option("overwrite", false, "Choose to overwrite target files " +
"unconditionally, even if they exist.")),
+ APPEND(DistCpConstants.CONF_LABEL_APPEND,
+ new Option("append", false,
+ "Reuse existing data in target files and append new data to them if possible")),
+
/**
* Should DisctpExecution be blocking
*/
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java Thu May 22 18:50:20 2014
@@ -39,6 +39,7 @@ public class DistCpOptions {
private boolean deleteMissing = false;
private boolean ignoreFailures = false;
private boolean overwrite = false;
+ private boolean append = false;
private boolean skipCRC = false;
private boolean blocking = true;
@@ -245,6 +246,22 @@ public class DistCpOptions {
}
/**
+ * @return whether we can append new data to target files
+ */
+ public boolean shouldAppend() {
+ return append;
+ }
+
+ /**
+ * Set if we want to append new data to target files. This is valid only with
+ * update option and CRC is not skipped.
+ */
+ public void setAppend(boolean append) {
+ validate(DistCpOptionSwitch.APPEND, append);
+ this.append = append;
+ }
+
+ /**
* Should CRC/checksum check be skipped while checking files are identical
*
* @return true if checksum check should be skipped while checking files are
@@ -472,6 +489,7 @@ public class DistCpOptions {
value : this.atomicCommit);
boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
value : this.skipCRC);
+ boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
if (syncFolder && atomicCommit) {
throw new IllegalArgumentException("Atomic commit can't be used with " +
@@ -492,6 +510,14 @@ public class DistCpOptions {
throw new IllegalArgumentException("Skip CRC is valid only with update options");
}
+ if (!syncFolder && append) {
+ throw new IllegalArgumentException(
+ "Append is valid only with update options");
+ }
+ if (skipCRC && append) {
+ throw new IllegalArgumentException(
+ "Append is disallowed when skipping CRC");
+ }
}
/**
@@ -510,6 +536,8 @@ public class DistCpOptions {
String.valueOf(deleteMissing));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
String.valueOf(overwrite));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
+ String.valueOf(append));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java Thu May 22 18:50:20 2014
@@ -140,6 +140,10 @@ public class OptionsParser {
option.setOverwrite(true);
}
+ if (command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) {
+ option.setAppend(true);
+ }
+
if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
option.setDeleteMissing(true);
}
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Thu May 22 18:50:20 2014
@@ -18,13 +18,20 @@
package org.apache.hadoop.tools.mapred;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.EnumSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
@@ -36,11 +43,6 @@ import org.apache.hadoop.tools.DistCpOpt
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.StringUtils;
-import java.io.*;
-import java.util.EnumSet;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper<> interface.
@@ -62,6 +64,15 @@ public class CopyMapper extends Mapper<T
BYTESSKIPPED, // Number of bytes that were skipped from copy.
}
+ /**
+ * Indicate the action for each file
+ */
+ static enum FileAction {
+ SKIP, // Skip copying the file since it's already in the target FS
+ APPEND, // Only need to append new data to the file in the target FS
+ OVERWRITE, // Overwrite the whole file
+ }
+
private static Log LOG = LogFactory.getLog(CopyMapper.class);
private Configuration conf;
@@ -70,6 +81,7 @@ public class CopyMapper extends Mapper<T
private boolean ignoreFailures = false;
private boolean skipCrc = false;
private boolean overWrite = false;
+ private boolean append = false;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null;
@@ -90,6 +102,7 @@ public class CopyMapper extends Mapper<T
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+ append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
@@ -224,20 +237,19 @@ public class CopyMapper extends Mapper<T
return;
}
- if (skipFile(sourceFS, sourceCurrStatus, target)) {
+ FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
+ if (action == FileAction.SKIP) {
LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+ " to " + target);
updateSkipCounters(context, sourceCurrStatus);
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
- }
- else {
+ } else {
copyFileWithRetry(description, sourceCurrStatus, target, context,
- fileAttributes);
+ action, fileAttributes);
}
DistCpUtils.preserve(target.getFileSystem(conf), target,
sourceCurrStatus, fileAttributes);
-
} catch (IOException exception) {
handleFailures(exception, sourceFileStatus, target, context);
}
@@ -254,14 +266,14 @@ public class CopyMapper extends Mapper<T
return DistCpUtils.unpackAttributes(attributeString);
}
- private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
- Path target, Context context,
- EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
-
+ private void copyFileWithRetry(String description,
+ FileStatus sourceFileStatus, Path target, Context context,
+ FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
+ throws IOException {
long bytesCopied;
try {
- bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description)
- .execute(sourceFileStatus, target, context, fileAttributes);
+ bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
+ action).execute(sourceFileStatus, target, context, fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
@@ -311,25 +323,48 @@ public class CopyMapper extends Mapper<T
context.getCounter(counter).increment(value);
}
- private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
- throws IOException {
- return targetFS.exists(target)
- && !overWrite
- && !mustUpdate(sourceFS, source, target);
- }
-
- private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
- throws IOException {
- final FileStatus targetFileStatus = targetFS.getFileStatus(target);
-
- return syncFolders
- && (
- targetFileStatus.getLen() != source.getLen()
- || (!skipCrc &&
- !DistCpUtils.checksumsAreEqual(sourceFS,
- source.getPath(), null, targetFS, target))
- || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
- preserve.contains(FileAttribute.BLOCKSIZE))
- );
+ private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
+ Path target) throws IOException {
+ final FileStatus targetFileStatus;
+ try {
+ targetFileStatus = targetFS.getFileStatus(target);
+ } catch (FileNotFoundException e) {
+ return FileAction.OVERWRITE;
+ }
+ if (targetFileStatus != null && !overWrite) {
+ if (canSkip(sourceFS, source, targetFileStatus)) {
+ return FileAction.SKIP;
+ } else if (append) {
+ long targetLen = targetFileStatus.getLen();
+ if (targetLen < source.getLen()) {
+ FileChecksum sourceChecksum = sourceFS.getFileChecksum(
+ source.getPath(), targetLen);
+ if (sourceChecksum != null
+ && sourceChecksum.equals(targetFS.getFileChecksum(target))) {
+ // We require that the checksum is not null. Thus currently only
+ // DistributedFileSystem is supported
+ return FileAction.APPEND;
+ }
+ }
+ }
+ }
+ return FileAction.OVERWRITE;
+ }
+
+ private boolean canSkip(FileSystem sourceFS, FileStatus source,
+ FileStatus target) throws IOException {
+ if (!syncFolders) {
+ return true;
+ }
+ boolean sameLength = target.getLen() == source.getLen();
+ boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
+ || !preserve.contains(FileAttribute.BLOCKSIZE);
+ if (sameLength && sameBlockSize) {
+ return skipCrc ||
+ DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
+ targetFS, target.getPath());
+ } else {
+ return false;
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Thu May 22 18:50:20 2014
@@ -18,10 +18,8 @@
package org.apache.hadoop.tools.mapred;
-import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.EnumSet;
@@ -29,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;
@@ -54,13 +55,15 @@ public class RetriableFileCopyCommand ex
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private static int BUFFER_SIZE = 8 * 1024;
private boolean skipCrc = false;
+ private FileAction action;
/**
* Constructor, taking a description of the action.
* @param description Verbose description of the copy operation.
*/
- public RetriableFileCopyCommand(String description) {
+ public RetriableFileCopyCommand(String description, FileAction action) {
super(description);
+ this.action = action;
}
/**
@@ -68,9 +71,11 @@ public class RetriableFileCopyCommand ex
*
* @param skipCrc Whether to skip the crc check.
* @param description A verbose description of the copy operation.
+ * @param action We should overwrite the target file or append new data to it.
*/
- public RetriableFileCopyCommand(boolean skipCrc, String description) {
- this(description);
+ public RetriableFileCopyCommand(boolean skipCrc, String description,
+ FileAction action) {
+ this(description, action);
this.skipCrc = skipCrc;
}
@@ -96,18 +101,17 @@ public class RetriableFileCopyCommand ex
}
private long doCopy(FileStatus sourceFileStatus, Path target,
- Mapper.Context context,
- EnumSet<FileAttribute> fileAttributes)
- throws IOException {
-
- Path tmpTargetPath = getTmpFile(target, context);
+ Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
+ throws IOException {
+ final boolean toAppend = action == FileAction.APPEND;
+ Path targetPath = toAppend ? target : getTmpFile(target, context);
final Configuration configuration = context.getConfiguration();
FileSystem targetFS = target.getFileSystem(configuration);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
- LOG.debug("Tmp-file path: " + tmpTargetPath);
+ LOG.debug("Target file path: " + targetPath);
}
final Path sourcePath = sourceFileStatus.getPath();
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
@@ -115,22 +119,31 @@ public class RetriableFileCopyCommand ex
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
.getFileChecksum(sourcePath) : null;
- long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
- context, fileAttributes, sourceChecksum);
+ final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
+ target).getLen() : 0;
+ long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
+ offset, context, fileAttributes, sourceChecksum);
- compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
- bytesRead);
+ compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
+ + offset);
//At this point, src&dest lengths are same. if length==0, we skip checksum
if ((bytesRead != 0) && (!skipCrc)) {
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
- targetFS, tmpTargetPath);
+ targetFS, targetPath);
+ }
+ // it's not append case, thus we first write to a temporary file, rename
+ // it to the target path.
+ if (!toAppend) {
+ promoteTmpToTarget(targetPath, target, targetFS);
}
- promoteTmpToTarget(tmpTargetPath, target, targetFS);
return bytesRead;
-
} finally {
- if (targetFS.exists(tmpTargetPath))
- targetFS.delete(tmpTargetPath, false);
+ // note that for append case, it is possible that we append partial data
+ // and then fail. In that case, for the next retry, we either reuse the
+ // partial appended data if it is good or we overwrite the whole file
+ if (!toAppend && targetFS.exists(targetPath)) {
+ targetFS.delete(targetPath, false);
+ }
}
}
@@ -147,29 +160,37 @@ public class RetriableFileCopyCommand ex
return null;
}
- private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
- FileStatus sourceFileStatus, Mapper.Context context,
+ private long copyToFile(Path targetPath, FileSystem targetFS,
+ FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
- OutputStream outStream = new BufferedOutputStream(
- targetFS.create(tmpTargetPath, permission,
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
- getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
- tmpTargetPath),
- getBlockSize(fileAttributes, sourceFileStatus, targetFS,
- tmpTargetPath),
- context, getChecksumOpt(fileAttributes, sourceChecksum)));
- return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
+ final OutputStream outStream;
+ if (action == FileAction.OVERWRITE) {
+ final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
+ targetFS, targetPath);
+ final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
+ targetFS, targetPath);
+ FSDataOutputStream out = targetFS.create(targetPath, permission,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ BUFFER_SIZE, repl, blockSize, context,
+ getChecksumOpt(fileAttributes, sourceChecksum));
+ outStream = new BufferedOutputStream(out);
+ } else {
+ outStream = new BufferedOutputStream(targetFS.append(targetPath,
+ BUFFER_SIZE));
+ }
+ return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
+ context);
}
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
- Configuration configuration, long bytesRead)
+ Configuration configuration, long targetLen)
throws IOException {
final Path sourcePath = sourceFileStatus.getPath();
FileSystem fs = sourcePath.getFileSystem(configuration);
- if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+ if (fs.getFileStatus(sourcePath).getLen() != targetLen)
throw new IOException("Mismatch in length of source:" + sourcePath
+ " and target:" + target);
}
@@ -215,8 +236,8 @@ public class RetriableFileCopyCommand ex
}
@VisibleForTesting
- long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
- int bufferSize, Mapper.Context context)
+ long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
+ OutputStream outStream, int bufferSize, Mapper.Context context)
throws IOException {
Path source = sourceFileStatus.getPath();
byte buf[] = new byte[bufferSize];
@@ -225,19 +246,21 @@ public class RetriableFileCopyCommand ex
try {
inStream = getInputStream(source, context.getConfiguration());
- int bytesRead = readBytes(inStream, buf);
+ int bytesRead = readBytes(inStream, buf, sourceOffset);
while (bytesRead >= 0) {
totalBytesRead += bytesRead;
+ if (action == FileAction.APPEND) {
+ sourceOffset += bytesRead;
+ }
outStream.write(buf, 0, bytesRead);
updateContextStatus(totalBytesRead, context, sourceFileStatus);
- bytesRead = inStream.read(buf);
+ bytesRead = readBytes(inStream, buf, sourceOffset);
}
outStream.close();
outStream = null;
} finally {
IOUtils.cleanup(LOG, outStream, inStream);
}
-
return totalBytesRead;
}
@@ -254,24 +277,27 @@ public class RetriableFileCopyCommand ex
context.setStatus(message.toString());
}
- private static int readBytes(InputStream inStream, byte buf[])
- throws IOException {
+ private static int readBytes(ThrottledInputStream inStream, byte buf[],
+ long position) throws IOException {
try {
- return inStream.read(buf);
- }
- catch (IOException e) {
+ if (position == 0) {
+ return inStream.read(buf);
+ } else {
+ return inStream.read(position, buf, 0, buf.length);
+ }
+ } catch (IOException e) {
throw new CopyReadException(e);
}
}
- private static ThrottledInputStream getInputStream(Path path, Configuration conf)
- throws IOException {
+ private static ThrottledInputStream getInputStream(Path path,
+ Configuration conf) throws IOException {
try {
FileSystem fs = path.getFileSystem(conf);
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
DistCpConstants.DEFAULT_BANDWIDTH_MB);
- return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
- bandwidthMB * 1024 * 1024);
+ FSDataInputStream in = fs.open(path);
+ return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
}
catch (IOException e) {
throw new CopyReadException(e);
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Thu May 22 18:50:20 2014
@@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+
+import com.google.common.base.Preconditions;
+
/**
* The ThrottleInputStream provides bandwidth throttling on a specified
* InputStream. It is implemented as a wrapper on top of another InputStream
@@ -90,6 +95,25 @@ public class ThrottledInputStream extend
return readLen;
}
+ /**
+ * Read bytes starting from the specified position. This requires rawStream is
+ * an instance of {@link PositionedReadable}.
+ */
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ if (!(rawStream instanceof PositionedReadable)) {
+ throw new UnsupportedOperationException(
+ "positioned read is not supported by the internal stream");
+ }
+ throttle();
+ int readLen = ((PositionedReadable) rawStream).read(position, buffer,
+ offset, length);
+ if (readLen != -1) {
+ bytesRead += readLen;
+ }
+ return readLen;
+ }
+
private void throttle() throws IOException {
if (getBytesPerSec() > maxBytesPerSec) {
try {
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Thu May 22 18:50:20 2014
@@ -18,9 +18,12 @@
package org.apache.hadoop.tools;
+import static org.junit.Assert.fail;
+
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.DistCpOptions.*;
import org.apache.hadoop.conf.Configuration;
@@ -554,4 +557,45 @@ public class TestOptionsParser {
Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
}
+
+ @Test
+ public void testAppendOption() {
+ Configuration conf = new Configuration();
+ Assert.assertFalse(conf.getBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+ Assert.assertFalse(conf.getBoolean(
+ DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+ DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+ "-append", "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ options.appendToConf(conf);
+ Assert.assertTrue(conf.getBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+ Assert.assertTrue(conf.getBoolean(
+ DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+ // make sure -append is only valid when -update is specified
+ try {
+ options = OptionsParser.parse(new String[] { "-append",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ fail("Append should fail if update option is not specified");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Append is valid only with update options", e);
+ }
+
+ // make sure -append is invalid when skipCrc is specified
+ try {
+ options = OptionsParser.parse(new String[] {
+ "-append", "-update", "-skipcrccheck",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/" });
+ fail("Append should fail if skipCrc option is specified");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Append is disallowed when skipping CRC", e);
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu May 22 18:50:20 2014
@@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -118,6 +120,16 @@ public class TestCopyMapper {
touchFile(SOURCE_PATH + "/7/8/9");
}
+ private static void appendSourceData() throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ for (Path source : pathList) {
+ if (fs.getFileStatus(source).isFile()) {
+ // append 2048 bytes per file
+ appendFile(source, DEFAULT_FILE_SIZE * 2);
+ }
+ }
+ }
+
private static void createSourceDataWithDifferentBlockSize() throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
@@ -201,85 +213,132 @@ public class TestCopyMapper {
}
}
+ /**
+ * Append specified length of bytes to a given file
+ */
+ private static void appendFile(Path p, int length) throws IOException {
+ byte[] toAppend = new byte[length];
+ Random random = new Random();
+ random.nextBytes(toAppend);
+ FSDataOutputStream out = cluster.getFileSystem().append(p);
+ try {
+ out.write(toAppend);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
@Test
public void testCopyWithDifferentChecksumType() throws Exception {
testCopy(true);
}
@Test(timeout=40000)
- public void testRun() {
+ public void testRun() throws Exception {
testCopy(false);
}
- private void testCopy(boolean preserveChecksum) {
- try {
- deleteState();
- if (preserveChecksum) {
- createSourceDataWithDifferentChecksumType();
- } else {
- createSourceData();
- }
+ @Test
+ public void testCopyWithAppend() throws Exception {
+ final FileSystem fs = cluster.getFileSystem();
+ // do the first distcp
+ testCopy(false);
+ // start appending data to source
+ appendSourceData();
- FileSystem fs = cluster.getFileSystem();
- CopyMapper copyMapper = new CopyMapper();
- StubContext stubContext = new StubContext(getConfiguration(), null, 0);
- Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
- = stubContext.getContext();
+ // do the distcp again with -update and -append option
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+ stubContext.getContext();
+ // Enable append
+ context.getConfiguration().setBoolean(
+ DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+ copyMapper.setup(context);
+ for (Path path: pathList) {
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
+ path)), context);
+ }
+
+ verifyCopy(fs, false);
+ // verify that we only copied new appended data
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ Assert.assertEquals(pathList.size(), stubContext.getReporter().
+ getCounter(CopyMapper.Counter.COPY).getValue());
+ }
+
+ private void testCopy(boolean preserveChecksum) throws Exception {
+ deleteState();
+ if (preserveChecksum) {
+ createSourceDataWithDifferentChecksumType();
+ } else {
+ createSourceData();
+ }
- Configuration configuration = context.getConfiguration();
- EnumSet<DistCpOptions.FileAttribute> fileAttributes
- = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
- if (preserveChecksum) {
- fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
- }
- configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
- DistCpUtils.packAttributes(fileAttributes));
+ FileSystem fs = cluster.getFileSystem();
+ CopyMapper copyMapper = new CopyMapper();
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+ = stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+ if (preserveChecksum) {
+ fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
- copyMapper.setup(context);
+ copyMapper.setup(context);
- for (Path path: pathList) {
- copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
- new CopyListingFileStatus(fs.getFileStatus(path)), context);
- }
+ for (Path path: pathList) {
+ copyMapper.map(
+ new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+ new CopyListingFileStatus(fs.getFileStatus(path)), context);
+ }
- // Check that the maps worked.
- for (Path path : pathList) {
- final Path targetPath = new Path(path.toString()
- .replaceAll(SOURCE_PATH, TARGET_PATH));
- Assert.assertTrue(fs.exists(targetPath));
- Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
- FileStatus sourceStatus = fs.getFileStatus(path);
- FileStatus targetStatus = fs.getFileStatus(targetPath);
- Assert.assertEquals(sourceStatus.getReplication(),
- targetStatus.getReplication());
- if (preserveChecksum) {
- Assert.assertEquals(sourceStatus.getBlockSize(),
- targetStatus.getBlockSize());
- }
- Assert.assertTrue(!fs.isFile(targetPath)
- || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
- }
-
- Assert.assertEquals(pathList.size(),
- stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
- if (!preserveChecksum) {
- Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
- .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
- .getValue());
- } else {
- Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
- .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
- .getValue());
- }
-
- testCopyingExistingFiles(fs, copyMapper, context);
- for (Text value : stubContext.getWriter().values()) {
- Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
- }
+ // Check that the maps worked.
+ verifyCopy(fs, preserveChecksum);
+ Assert.assertEquals(pathList.size(), stubContext.getReporter()
+ .getCounter(CopyMapper.Counter.COPY).getValue());
+ if (!preserveChecksum) {
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ } else {
+ Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
}
- catch (Exception e) {
- LOG.error("Unexpected exception: ", e);
- Assert.assertTrue(false);
+
+ testCopyingExistingFiles(fs, copyMapper, context);
+ for (Text value : stubContext.getWriter().values()) {
+ Assert.assertTrue(value.toString() + " is not skipped", value
+ .toString().startsWith("SKIP:"));
+ }
+ }
+
+ private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+ throws Exception {
+ for (Path path : pathList) {
+ final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
+ TARGET_PATH));
+ Assert.assertTrue(fs.exists(targetPath));
+ Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+ FileStatus sourceStatus = fs.getFileStatus(path);
+ FileStatus targetStatus = fs.getFileStatus(targetPath);
+ Assert.assertEquals(sourceStatus.getReplication(),
+ targetStatus.getReplication());
+ if (preserveChecksum) {
+ Assert.assertEquals(sourceStatus.getBlockSize(),
+ targetStatus.getBlockSize());
+ }
+ Assert.assertTrue(!fs.isFile(targetPath)
+ || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
}
}
Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java Thu May 22 18:50:20 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -48,8 +49,8 @@ public class TestRetriableFileCopyComman
Exception actualEx = null;
try {
- new RetriableFileCopyCommand("testFailOnCloseError")
- .copyBytes(stat, out, 512, context);
+ new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
+ .copyBytes(stat, 0, out, 512, context);
} catch (Exception e) {
actualEx = e;
}