You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/19 01:46:36 UTC
[hadoop] branch trunk updated: HADOOP-16158. DistCp to support
checksum validation when copy blocks in parallel (#919)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c765584 HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)
c765584 is described below
commit c765584eb231f8482f5b90b7e8f61f9f7a931d09
Author: KAI XIE <gi...@gmail.com>
AuthorDate: Mon Aug 19 09:46:31 2019 +0800
HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)
* DistCp to support checksum validation when copy blocks in parallel
* address review comments
* add checksums comparison test for combine mode
---
.../apache/hadoop/tools/mapred/CopyCommitter.java | 15 +-
.../tools/mapred/RetriableFileCopyCommand.java | 56 +------
.../org/apache/hadoop/tools/util/DistCpUtils.java | 60 ++++++++
.../hadoop/tools/mapred/TestCopyCommitter.java | 161 ++++++++++++++++++++-
.../apache/hadoop/tools/util/TestDistCpUtils.java | 67 +++++++++
.../tools/util/TestDistCpUtilsWithCombineMode.java | 115 +++++++++++++++
6 files changed, 412 insertions(+), 62 deletions(-)
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index d7a730d..546062f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -73,6 +73,7 @@ public class CopyCommitter extends FileOutputCommitter {
private boolean overwrite = false;
private boolean targetPathExists = true;
private boolean ignoreFailures = false;
+ private boolean skipCrc = false;
private int blocksPerChunk = 0;
/**
@@ -87,6 +88,9 @@ public class CopyCommitter extends FileOutputCommitter {
blocksPerChunk = context.getConfiguration().getInt(
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0);
LOG.debug("blocks per chunk {}", blocksPerChunk);
+ skipCrc = context.getConfiguration().getBoolean(
+ DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
+ LOG.debug("skip CRC is {}", skipCrc);
this.taskAttemptContext = context;
}
@@ -247,7 +251,8 @@ public class CopyCommitter extends FileOutputCommitter {
== srcFileStatus.getLen()) {
// This is the last chunk of the splits, consolidate allChunkPaths
try {
- concatFileChunks(conf, targetFile, allChunkPaths);
+ concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
+ allChunkPaths);
} catch (IOException e) {
// If the concat failed because a chunk file doesn't exist,
// then we assume that the CopyMapper has skipped copying this
@@ -603,8 +608,9 @@ public class CopyCommitter extends FileOutputCommitter {
/**
* Concat the passed chunk files into one and rename it the targetFile.
*/
- private void concatFileChunks(Configuration conf, Path targetFile,
- LinkedList<Path> allChunkPaths) throws IOException {
+ private void concatFileChunks(Configuration conf, Path sourceFile,
+ Path targetFile, LinkedList<Path> allChunkPaths)
+ throws IOException {
if (allChunkPaths.size() == 1) {
return;
}
@@ -613,6 +619,7 @@ public class CopyCommitter extends FileOutputCommitter {
+ allChunkPaths.size());
}
FileSystem dstfs = targetFile.getFileSystem(conf);
+ FileSystem srcfs = sourceFile.getFileSystem(conf);
Path firstChunkFile = allChunkPaths.removeFirst();
Path[] restChunkFiles = new Path[allChunkPaths.size()];
@@ -630,6 +637,8 @@ public class CopyCommitter extends FileOutputCommitter {
LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
}
rename(dstfs, firstChunkFile, targetFile);
+ DistCpUtils.compareFileLengthsAndChecksums(
+ srcfs, sourceFile, null, dstfs, targetFile, skipCrc);
}
/**
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index db21f64..fa91930 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -143,15 +143,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
offset, context, fileAttributes, sourceChecksum);
if (!source.isSplit()) {
- compareFileLengths(source, targetPath, configuration, bytesRead
- + offset);
- }
- //At this point, src&dest lengths are same. if length==0, we skip checksum
- if ((bytesRead != 0) && (!skipCrc)) {
- if (!source.isSplit()) {
- compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
- targetFS, targetPath);
- }
+ DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath,
+ sourceChecksum, targetFS, targetPath, skipCrc);
}
// it's not append or direct write (preferred for s3a) case, thus we first
// write to a temporary file, then rename it to the target path.
@@ -216,51 +209,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
context);
}
- private void compareFileLengths(CopyListingFileStatus source, Path target,
- Configuration configuration, long targetLen)
- throws IOException {
- final Path sourcePath = source.getPath();
- FileSystem fs = sourcePath.getFileSystem(configuration);
- long srcLen = fs.getFileStatus(sourcePath).getLen();
- if (srcLen != targetLen)
- throw new IOException("Mismatch in length of source:" + sourcePath + " (" + srcLen +
- ") and target:" + target + " (" + targetLen + ")");
- }
-
- private void compareCheckSums(FileSystem sourceFS, Path source,
- FileChecksum sourceChecksum, FileSystem targetFS, Path target)
- throws IOException {
- if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
- targetFS, target)) {
- StringBuilder errorMessage =
- new StringBuilder("Checksum mismatch between ")
- .append(source).append(" and ").append(target).append(".");
- boolean addSkipHint = false;
- String srcScheme = sourceFS.getScheme();
- String targetScheme = targetFS.getScheme();
- if (!srcScheme.equals(targetScheme)
- && !(srcScheme.contains("hdfs") && targetScheme.contains("hdfs"))) {
- // the filesystems are different and they aren't both hdfs connectors
- errorMessage.append("Source and destination filesystems are of"
- + " different types\n")
- .append("Their checksum algorithms may be incompatible");
- addSkipHint = true;
- } else if (sourceFS.getFileStatus(source).getBlockSize() !=
- targetFS.getFileStatus(target).getBlockSize()) {
- errorMessage.append(" Source and target differ in block-size.\n")
- .append(" Use -pb to preserve block-sizes during copy.");
- addSkipHint = true;
- }
- if (addSkipHint) {
- errorMessage.append(" You can skip checksum-checks altogether "
- + " with -skipcrccheck.\n")
- .append(" (NOTE: By skipping checksums, one runs the risk of " +
- "masking data-corruption during file-transfer.)\n");
- }
- throw new IOException(errorMessage.toString());
- }
- }
-
//If target file exists and unable to delete target - fail
//If target doesn't exist and unable to create parent folder - fail
//If target is successfully deleted and parent exists, if rename fails - fail
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index 96a7c5d..3ba9802 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -583,6 +583,66 @@ public class DistCpUtils {
sourceChecksum.equals(targetChecksum));
}
+ /**
+ * Utility to compare file lengths and checksums for source and target.
+ *
+ * @param sourceFS FileSystem for the source path.
+ * @param source The source path.
+ * @param sourceChecksum The checksum of the source file. If it is null we
+ * still need to retrieve it through sourceFS.
+ * @param targetFS FileSystem for the target path.
+ * @param target The target path.
+ * @param skipCrc The flag to indicate whether to skip checksums.
+ * @throws IOException if there's a mismatch in file lengths or checksums.
+ */
+ public static void compareFileLengthsAndChecksums(
+ FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
+ FileSystem targetFS, Path target, boolean skipCrc) throws IOException {
+ long srcLen = sourceFS.getFileStatus(source).getLen();
+ long tgtLen = targetFS.getFileStatus(target).getLen();
+ if (srcLen != tgtLen) {
+ throw new IOException(
+ "Mismatch in length of source:" + source + " (" + srcLen
+ + ") and target:" + target + " (" + tgtLen + ")");
+ }
+
+ //At this point, src & dest lengths are same. if length==0, we skip checksum
+ if ((srcLen != 0) && (!skipCrc)) {
+ if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
+ targetFS, target)) {
+ StringBuilder errorMessage =
+ new StringBuilder("Checksum mismatch between ")
+ .append(source).append(" and ").append(target).append(".");
+ boolean addSkipHint = false;
+ String srcScheme = sourceFS.getScheme();
+ String targetScheme = targetFS.getScheme();
+ if (!srcScheme.equals(targetScheme)) {
+ // the filesystems are different and they aren't both hdfs connectors
+ errorMessage.append("Source and destination filesystems are of"
+ + " different types\n")
+ .append("Their checksum algorithms may be incompatible");
+ addSkipHint = true;
+ } else if (sourceFS.getFileStatus(source).getBlockSize() !=
+ targetFS.getFileStatus(target).getBlockSize()) {
+ errorMessage.append(" Source and target differ in block-size.\n")
+ .append(" Use -pb to preserve block-sizes during copy.");
+ addSkipHint = true;
+ }
+ if (addSkipHint) {
+ errorMessage
+ .append(" You can choose file-level checksum validation via "
+ + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
+ + " or filesystems are different.")
+ .append(" Or you can skip checksum-checks altogether "
+ + " with -skipcrccheck.\n")
+ .append(" (NOTE: By skipping checksums, one runs the risk of " +
+ "masking data-corruption during file-transfer.)\n");
+ }
+ throw new IOException(errorMessage.toString());
+ }
+ }
+ }
+
/*
* Return the Path for a given chunk.
* Used when splitting large file into chunks to copy in parallel.
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 2ef89e5..f4566a6 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@@ -33,6 +36,7 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
@@ -40,6 +44,7 @@ import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
+import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.security.Credentials;
import org.junit.*;
@@ -55,13 +60,16 @@ public class TestCopyCommitter {
private static final Random rand = new Random();
+ private static final long BLOCK_SIZE = 1024;
private static final Credentials CREDENTIALS = new Credentials();
public static final int PORT = 39737;
- private static Configuration config;
+ private static Configuration clusterConfig;
private static MiniDFSCluster cluster;
+ private Configuration config;
+
private static Job getJobForClient() throws IOException {
Job job = Job.getInstance(new Configuration());
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
@@ -73,10 +81,17 @@ public class TestCopyCommitter {
@BeforeClass
public static void create() throws IOException {
- config = getJobForClient().getConfiguration();
- config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
- cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
- .build();
+ clusterConfig = getJobForClient().getConfiguration();
+ clusterConfig.setLong(
+ DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
+ clusterConfig.setLong(
+ DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ clusterConfig.setLong(
+ DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(clusterConfig)
+ .numDataNodes(1)
+ .format(true)
+ .build();
}
@AfterClass
@@ -88,6 +103,7 @@ public class TestCopyCommitter {
@Before
public void createMetaFolder() throws IOException {
+ config = new Configuration(clusterConfig);
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
Path meta = new Path("/meta");
cluster.getFileSystem().mkdirs(meta);
@@ -397,6 +413,141 @@ public class TestCopyCommitter {
}
}
+ @Test
+ public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException {
+ testCommitWithChecksumMismatch(true);
+ }
+
+ @Test
+ public void testCommitWithChecksumMismatchWithoutSkipCrc()
+ throws IOException {
+ testCommitWithChecksumMismatch(false);
+ }
+
+ private void testCommitWithChecksumMismatch(boolean skipCrc)
+ throws IOException {
+
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+ JobContext jobContext = new JobContextImpl(
+ taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getTaskAttemptID().getJobID());
+ Configuration conf = jobContext.getConfiguration();
+
+ String sourceBase;
+ String targetBase;
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
+ targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
+
+ int blocksPerChunk = 5;
+ String srcFilename = "/srcdata";
+ createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase,
+ srcFilename, blocksPerChunk);
+
+ DistCpOptions options = new DistCpOptions.Builder(
+ Collections.singletonList(new Path(sourceBase)),
+ new Path("/out"))
+ .withBlocksPerChunk(blocksPerChunk)
+ .withCRC(skipCrc)
+ .build();
+ options.appendToConf(conf);
+ conf.setBoolean(
+ DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+ DistCpContext context = new DistCpContext(options);
+ context.setTargetPathExists(false);
+
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+ Path listingFile = new Path("/tmp1/"
+ + String.valueOf(rand.nextLong()));
+ listing.buildListing(listingFile, context);
+
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+ OutputCommitter committer = new CopyCommitter(
+ null, taskAttemptContext);
+ try {
+ committer.commitJob(jobContext);
+ if (!skipCrc) {
+ Assert.fail("Expected commit to fail");
+ }
+ Assert.assertFalse(DistCpUtils.checksumsAreEqual(
+ fs, new Path(sourceBase + srcFilename), null,
+ fs, new Path(targetBase + srcFilename)));
+ } catch(IOException exception) {
+ if (skipCrc) {
+ LOG.error("Unexpected exception is found", exception);
+ throw exception;
+ }
+ Throwable cause = exception.getCause();
+ GenericTestUtils.assertExceptionContains(
+ "Checksum mismatch", cause);
+ }
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp1");
+ TestDistCpUtils.delete(fs, "/meta");
+ }
+ }
+
+ /**
+ * Create a source file and its DistCp working files with different checksum
+ * to test the checksum validation for copying blocks in parallel.
+ *
+ * For the ease of construction, it assumes a source file can be broken down
+ * into 2 working files (or 2 chunks).
+ *
+ * So for a source file with length =
+ * BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2,
+ * its 1st working file will have length =
+ * BLOCK_SIZE * blocksPerChunk,
+ * then the 2nd working file will have length =
+ * BLOCK_SIZE / 2.
+ * And the working files are generated with a different seed to mimic
+ * same length but different checksum scenario.
+ *
+ * @param fs the FileSystem
+ * @param targetBase the path to the working files
+ * @param sourceBase the path to a source file
+ * @param filename the filename to copy and work on
+ * @param blocksPerChunk the blocks per chunk config that enables copying
+ * blocks in parallel
+ * @throws IOException when it fails to create files
+ */
+ private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs,
+ String targetBase,
+ String sourceBase,
+ String filename,
+ int blocksPerChunk)
+ throws IOException {
+
+ long srcSeed = System.currentTimeMillis();
+ long dstSeed = srcSeed + rand.nextLong();
+ int bufferLen = 128;
+ short replFactor = 2;
+ Path srcData = new Path(sourceBase + filename);
+
+ // create data with 2 chunks: the 2nd chunk has half of the block size
+ long firstChunkLength = BLOCK_SIZE * blocksPerChunk;
+ long secondChunkLength = BLOCK_SIZE / 2;
+
+ DFSTestUtil.createFile(fs, srcData,
+ bufferLen, firstChunkLength, BLOCK_SIZE, replFactor,
+ srcSeed);
+ DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData,
+ (int) secondChunkLength);
+
+ DFSTestUtil.createFile(fs, new Path(targetBase
+ + filename + ".____distcpSplit____0."
+ + firstChunkLength), bufferLen,
+ firstChunkLength, BLOCK_SIZE, replFactor, dstSeed);
+ DFSTestUtil.createFile(fs, new Path(targetBase
+ + filename + ".____distcpSplit____"
+ + firstChunkLength + "." + secondChunkLength), bufferLen,
+ secondChunkLength, BLOCK_SIZE, replFactor, dstSeed);
+ }
+
private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
return new TaskAttemptContextImpl(conf,
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
index 304e41c..5cf1840 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.tools.ECAdmin;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
@@ -1205,6 +1207,71 @@ public class TestDistCpUtils {
Assert.assertFalse(srcStatus.getReplication() == f2Status.getReplication());
}
+ @Test
+ public void testCompareFileLengthsAndChecksums() throws IOException {
+
+ String base = "/tmp/verify-checksum/";
+ long srcSeed = System.currentTimeMillis();
+ long dstSeed = srcSeed + rand.nextLong();
+ short replFactor = 2;
+
+ FileSystem fs = FileSystem.get(config);
+ Path basePath = new Path(base);
+ fs.mkdirs(basePath);
+
+ // empty lengths comparison
+ Path srcWithLen0 = new Path(base + "srcLen0");
+ Path dstWithLen0 = new Path(base + "dstLen0");
+ fs.create(srcWithLen0).close();
+ fs.create(dstWithLen0).close();
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0,
+ null, fs, dstWithLen0, false);
+
+ // different lengths comparison
+ Path srcWithLen1 = new Path(base + "srcLen1");
+ Path dstWithLen2 = new Path(base + "dstLen2");
+ DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed);
+ DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed);
+ try {
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1,
+ null, fs, dstWithLen2, false);
+ Assert.fail("Expected different lengths comparison to fail!");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Mismatch in length", e);
+ }
+
+ // checksums matched
+ Path srcWithChecksum1 = new Path(base + "srcChecksum1");
+ Path dstWithChecksum1 = new Path(base + "dstChecksum1");
+ DFSTestUtil.createFile(fs, srcWithChecksum1, 1024,
+ replFactor, srcSeed);
+ DFSTestUtil.createFile(fs, dstWithChecksum1, 1024,
+ replFactor, srcSeed);
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+ null, fs, dstWithChecksum1, false);
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+ fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1,
+ false);
+
+ // checksums mismatched
+ Path dstWithChecksum2 = new Path(base + "dstChecksum2");
+ DFSTestUtil.createFile(fs, dstWithChecksum2, 1024,
+ replFactor, dstSeed);
+ try {
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+ null, fs, dstWithChecksum2, false);
+ Assert.fail("Expected different checksums comparison to fail!");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Checksum mismatch", e);
+ }
+
+ // checksums mismatched but skipped
+ DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+ null, fs, dstWithChecksum2, true);
+ }
+
private static Random rand = new Random();
public static String createTestSetup(FileSystem fs) throws IOException {
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java
new file mode 100644
index 0000000..5d44ab0
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Test length and checksums comparison with checksum combine mode.
+ * When the combine mode is COMPOSITE_CRC, it should tolerate different file
+ * systems and different block sizes.
+ */
+public class TestDistCpUtilsWithCombineMode {
+ private static final Logger LOG = LoggerFactory.getLogger(TestDistCpUtils.class);
+
+ private Configuration config;
+ private MiniDFSCluster cluster;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void create() throws IOException {
+ config = new Configuration();
+ if (testName.getMethodName().contains("WithCombineMode")) {
+ config.set("dfs.checksum.combine.mode", "COMPOSITE_CRC");
+ }
+ config.setLong(
+ DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 512);
+ cluster = new MiniDFSCluster.Builder(config)
+ .numDataNodes(2)
+ .format(true)
+ .build();
+ }
+
+ @After
+ public void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testChecksumsComparisonWithCombineMode() throws IOException {
+ try {
+ compareSameContentButDiffBlockSizes();
+ } catch (IOException e) {
+ LOG.error("Unexpected exception is found", e);
+ throw e;
+ }
+ }
+
+ @Test
+ public void testChecksumsComparisonWithoutCombineMode() {
+ try {
+ compareSameContentButDiffBlockSizes();
+ Assert.fail("Expected comparison to fail");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Checksum mismatch", e);
+ }
+ }
+
+ private void compareSameContentButDiffBlockSizes() throws IOException {
+ String base = "/tmp/verify-checksum-" + testName.getMethodName() + "/";
+ long seed = System.currentTimeMillis();
+ short rf = 2;
+
+ FileSystem fs = FileSystem.get(config);
+ Path basePath = new Path(base);
+ fs.mkdirs(basePath);
+
+ // create 2 files of same content but different block-sizes
+ Path src = new Path(base + "src");
+ Path dst = new Path(base + "dst");
+ DFSTestUtil.createFile(fs, src, 256, 1024, 512,
+ rf, seed);
+ DFSTestUtil.createFile(fs, dst, 256, 1024, 1024,
+ rf, seed);
+ // then compare
+ DistCpUtils.compareFileLengthsAndChecksums(fs, src,
+ null, fs, dst, false);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org