You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/19 01:56:28 UTC

[hadoop] branch branch-3.1 updated: HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c1a2b29  HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)
c1a2b29 is described below

commit c1a2b29c0f989b01b65060f2f114145392fcc320
Author: KAI XIE <gi...@gmail.com>
AuthorDate: Mon Aug 19 09:46:31 2019 +0800

    HADOOP-16158. DistCp to support checksum validation when copy blocks in parallel (#919)
    
    * DistCp to support checksum validation when copy blocks in parallel
    
    * address review comments
    
    * add checksums comparison test for combine mode
    
    (cherry picked from commit c765584eb231f8482f5b90b7e8f61f9f7a931d09)
    (cherry picked from commit b3c14d4132ed6aa871bb88c4f84f3e3d90da6f93)
    
    Conflicts:
    	hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
---
 .../apache/hadoop/tools/mapred/CopyCommitter.java  |  15 +-
 .../tools/mapred/RetriableFileCopyCommand.java     |  56 +------
 .../org/apache/hadoop/tools/util/DistCpUtils.java  |  60 ++++++++
 .../hadoop/tools/mapred/TestCopyCommitter.java     | 161 ++++++++++++++++++++-
 .../apache/hadoop/tools/util/TestDistCpUtils.java  |  67 +++++++++
 .../tools/util/TestDistCpUtilsWithCombineMode.java | 115 +++++++++++++++
 6 files changed, 412 insertions(+), 62 deletions(-)

diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index d7a730d..546062f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -73,6 +73,7 @@ public class CopyCommitter extends FileOutputCommitter {
   private boolean overwrite = false;
   private boolean targetPathExists = true;
   private boolean ignoreFailures = false;
+  private boolean skipCrc = false;
   private int blocksPerChunk = 0;
 
   /**
@@ -87,6 +88,9 @@ public class CopyCommitter extends FileOutputCommitter {
     blocksPerChunk = context.getConfiguration().getInt(
         DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0);
     LOG.debug("blocks per chunk {}", blocksPerChunk);
+    skipCrc = context.getConfiguration().getBoolean(
+        DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
+    LOG.debug("skip CRC is {}", skipCrc);
     this.taskAttemptContext = context;
   }
 
@@ -247,7 +251,8 @@ public class CopyCommitter extends FileOutputCommitter {
             == srcFileStatus.getLen()) {
           // This is the last chunk of the splits, consolidate allChunkPaths
           try {
-            concatFileChunks(conf, targetFile, allChunkPaths);
+            concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
+                allChunkPaths);
           } catch (IOException e) {
             // If the concat failed because a chunk file doesn't exist,
             // then we assume that the CopyMapper has skipped copying this
@@ -603,8 +608,9 @@ public class CopyCommitter extends FileOutputCommitter {
   /**
    * Concat the passed chunk files into one and rename it the targetFile.
    */
-  private void concatFileChunks(Configuration conf, Path targetFile,
-      LinkedList<Path> allChunkPaths) throws IOException {
+  private void concatFileChunks(Configuration conf, Path sourceFile,
+                                Path targetFile, LinkedList<Path> allChunkPaths)
+      throws IOException {
     if (allChunkPaths.size() == 1) {
       return;
     }
@@ -613,6 +619,7 @@ public class CopyCommitter extends FileOutputCommitter {
           + allChunkPaths.size());
     }
     FileSystem dstfs = targetFile.getFileSystem(conf);
+    FileSystem srcfs = sourceFile.getFileSystem(conf);
 
     Path firstChunkFile = allChunkPaths.removeFirst();
     Path[] restChunkFiles = new Path[allChunkPaths.size()];
@@ -630,6 +637,8 @@ public class CopyCommitter extends FileOutputCommitter {
       LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
     }
     rename(dstfs, firstChunkFile, targetFile);
+    DistCpUtils.compareFileLengthsAndChecksums(
+        srcfs, sourceFile, null, dstfs, targetFile, skipCrc);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 25c1de1..765c8f8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -144,15 +144,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           offset, context, fileAttributes, sourceChecksum);
 
       if (!source.isSplit()) {
-        compareFileLengths(source, targetPath, configuration, bytesRead
-            + offset);
-      }
-      //At this point, src&dest lengths are same. if length==0, we skip checksum
-      if ((bytesRead != 0) && (!skipCrc)) {
-        if (!source.isSplit()) {
-          compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
-              targetFS, targetPath);
-        }
+        DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath,
+            sourceChecksum, targetFS, targetPath, skipCrc);
       }
       // it's not append or direct write (preferred for s3a) case, thus we first
       // write to a temporary file, then rename it to the target path.
@@ -217,51 +210,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
         context);
   }
 
-  private void compareFileLengths(CopyListingFileStatus source, Path target,
-                                  Configuration configuration, long targetLen)
-                                  throws IOException {
-    final Path sourcePath = source.getPath();
-    FileSystem fs = sourcePath.getFileSystem(configuration);
-    long srcLen = fs.getFileStatus(sourcePath).getLen();
-    if (srcLen != targetLen)
-      throw new IOException("Mismatch in length of source:" + sourcePath + " (" + srcLen +
-          ") and target:" + target + " (" + targetLen + ")");
-  }
-
-  private void compareCheckSums(FileSystem sourceFS, Path source,
-      FileChecksum sourceChecksum, FileSystem targetFS, Path target)
-      throws IOException {
-    if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
-        targetFS, target)) {
-      StringBuilder errorMessage =
-          new StringBuilder("Checksum mismatch between ")
-              .append(source).append(" and ").append(target).append(".");
-      boolean addSkipHint = false;
-      String srcScheme = sourceFS.getScheme();
-      String targetScheme = targetFS.getScheme();
-      if (!srcScheme.equals(targetScheme)
-          && !(srcScheme.contains("hdfs") && targetScheme.contains("hdfs"))) {
-        // the filesystems are different and they aren't both hdfs connectors
-        errorMessage.append("Source and destination filesystems are of"
-            + " different types\n")
-            .append("Their checksum algorithms may be incompatible");
-        addSkipHint = true;
-      } else if (sourceFS.getFileStatus(source).getBlockSize() !=
-          targetFS.getFileStatus(target).getBlockSize()) {
-        errorMessage.append(" Source and target differ in block-size.\n")
-            .append(" Use -pb to preserve block-sizes during copy.");
-        addSkipHint = true;
-      }
-      if (addSkipHint) {
-        errorMessage.append(" You can skip checksum-checks altogether "
-            + " with -skipcrccheck.\n")
-            .append(" (NOTE: By skipping checksums, one runs the risk of " +
-                "masking data-corruption during file-transfer.)\n");
-      }
-      throw new IOException(errorMessage.toString());
-    }
-  }
-
   //If target file exists and unable to delete target - fail
   //If target doesn't exist and unable to create parent folder - fail
   //If target is successfully deleted and parent exists, if rename fails - fail
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index 2a60e80..496913f0 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -582,6 +582,66 @@ public class DistCpUtils {
             sourceChecksum.equals(targetChecksum));
   }
 
+  /**
+   * Utility to compare file lengths and checksums for source and target.
+   *
+   * @param sourceFS FileSystem for the source path.
+   * @param source The source path.
+   * @param sourceChecksum The checksum of the source file. If it is null we
+   * still need to retrieve it through sourceFS.
+   * @param targetFS FileSystem for the target path.
+   * @param target The target path.
+   * @param skipCrc The flag to indicate whether to skip checksums.
+   * @throws IOException if there's a mismatch in file lengths or checksums.
+   */
+  public static void compareFileLengthsAndChecksums(
+      FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
+      FileSystem targetFS, Path target, boolean skipCrc) throws IOException {
+    long srcLen = sourceFS.getFileStatus(source).getLen();
+    long tgtLen = targetFS.getFileStatus(target).getLen();
+    if (srcLen != tgtLen) {
+      throw new IOException(
+          "Mismatch in length of source:" + source + " (" + srcLen
+              + ") and target:" + target + " (" + tgtLen + ")");
+    }
+
+    //At this point, src & dest lengths are same. if length==0, we skip checksum
+    if ((srcLen != 0) && (!skipCrc)) {
+      if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
+          targetFS, target)) {
+        StringBuilder errorMessage =
+            new StringBuilder("Checksum mismatch between ")
+                .append(source).append(" and ").append(target).append(".");
+        boolean addSkipHint = false;
+        String srcScheme = sourceFS.getScheme();
+        String targetScheme = targetFS.getScheme();
+        if (!srcScheme.equals(targetScheme)) {
+          // the filesystems are different and they aren't both hdfs connectors
+          errorMessage.append("Source and destination filesystems are of"
+              + " different types\n")
+              .append("Their checksum algorithms may be incompatible");
+          addSkipHint = true;
+        } else if (sourceFS.getFileStatus(source).getBlockSize() !=
+            targetFS.getFileStatus(target).getBlockSize()) {
+          errorMessage.append(" Source and target differ in block-size.\n")
+              .append(" Use -pb to preserve block-sizes during copy.");
+          addSkipHint = true;
+        }
+        if (addSkipHint) {
+          errorMessage
+              .append(" You can choose file-level checksum validation via "
+                  + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
+                  + " or filesystems are different.")
+              .append(" Or you can skip checksum-checks altogether "
+                  + " with -skipcrccheck.\n")
+              .append(" (NOTE: By skipping checksums, one runs the risk of " +
+                  "masking data-corruption during file-transfer.)\n");
+        }
+        throw new IOException(errorMessage.toString());
+      }
+    }
+  }
+
   /*
    * Return the Path for a given chunk.
    * Used when splitting large file into chunks to copy in parallel.
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 2f95eff..c8a77cc 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
@@ -33,6 +36,7 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.CopyListing;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
@@ -40,6 +44,7 @@ import org.apache.hadoop.tools.DistCpContext;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.GlobbedCopyListing;
+import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.apache.hadoop.security.Credentials;
 import org.junit.*;
@@ -55,13 +60,16 @@ public class TestCopyCommitter {
 
   private static final Random rand = new Random();
 
+  private static final long BLOCK_SIZE = 1024;
   private static final Credentials CREDENTIALS = new Credentials();
   public static final int PORT = 39737;
 
 
-  private static Configuration config;
+  private static Configuration clusterConfig;
   private static MiniDFSCluster cluster;
 
+  private Configuration config;
+
   private static Job getJobForClient() throws IOException {
     Job job = Job.getInstance(new Configuration());
     job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
@@ -73,10 +81,17 @@ public class TestCopyCommitter {
 
   @BeforeClass
   public static void create() throws IOException {
-    config = getJobForClient().getConfiguration();
-    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
-    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
-                      .build();
+    clusterConfig = getJobForClient().getConfiguration();
+    clusterConfig.setLong(
+        DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
+    clusterConfig.setLong(
+        DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    clusterConfig.setLong(
+        DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(clusterConfig)
+        .numDataNodes(1)
+        .format(true)
+        .build();
   }
 
   @AfterClass
@@ -88,6 +103,7 @@ public class TestCopyCommitter {
 
   @Before
   public void createMetaFolder() throws IOException {
+    config = new Configuration(clusterConfig);
     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
     Path meta = new Path("/meta");
     cluster.getFileSystem().mkdirs(meta);
@@ -397,6 +413,141 @@ public class TestCopyCommitter {
     }
   }
 
+  @Test
+  public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException {
+    testCommitWithChecksumMismatch(true);
+  }
+
+  @Test
+  public void testCommitWithChecksumMismatchWithoutSkipCrc()
+      throws IOException {
+    testCommitWithChecksumMismatch(false);
+  }
+
+  private void testCommitWithChecksumMismatch(boolean skipCrc)
+      throws IOException {
+
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContextImpl(
+        taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+      sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
+      targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
+
+      int blocksPerChunk = 5;
+      String srcFilename = "/srcdata";
+      createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase,
+          srcFilename, blocksPerChunk);
+
+      DistCpOptions options = new DistCpOptions.Builder(
+          Collections.singletonList(new Path(sourceBase)),
+          new Path("/out"))
+          .withBlocksPerChunk(blocksPerChunk)
+          .withCRC(skipCrc)
+          .build();
+      options.appendToConf(conf);
+      conf.setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+      DistCpContext context = new DistCpContext(options);
+      context.setTargetPathExists(false);
+
+      CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+      Path listingFile = new Path("/tmp1/"
+          + String.valueOf(rand.nextLong()));
+      listing.buildListing(listingFile, context);
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+      OutputCommitter committer = new CopyCommitter(
+          null, taskAttemptContext);
+      try {
+        committer.commitJob(jobContext);
+        if (!skipCrc) {
+          Assert.fail("Expected commit to fail");
+        }
+        Assert.assertFalse(DistCpUtils.checksumsAreEqual(
+            fs, new Path(sourceBase + srcFilename), null,
+            fs, new Path(targetBase + srcFilename)));
+      } catch(IOException exception) {
+        if (skipCrc) {
+          LOG.error("Unexpected exception is found", exception);
+          throw exception;
+        }
+        Throwable cause = exception.getCause();
+        GenericTestUtils.assertExceptionContains(
+            "Checksum mismatch", cause);
+      }
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      TestDistCpUtils.delete(fs, "/meta");
+    }
+  }
+
+  /**
+   * Create a source file and its DistCp working files with different checksum
+   * to test the checksum validation for copying blocks in parallel.
+   *
+   * For the ease of construction, it assumes a source file can be broken down
+   * into 2 working files (or 2 chunks).
+   *
+   * So for a source file with length =
+   *     BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2,
+   * its 1st working file will have length =
+   *     BLOCK_SIZE * blocksPerChunk,
+   * then the 2nd working file will have length =
+   *     BLOCK_SIZE / 2.
+   * And the working files are generated with a different seed to mimic
+   * same length but different checksum scenario.
+   *
+   * @param fs the FileSystem
+   * @param targetBase the path to the working files
+   * @param sourceBase the path to a source file
+   * @param filename the filename to copy and work on
+   * @param blocksPerChunk the blocks per chunk config that enables copying
+   *                       blocks in parallel
+   * @throws IOException when it fails to create files
+   */
+  private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs,
+                                                          String targetBase,
+                                                          String sourceBase,
+                                                          String filename,
+                                                          int blocksPerChunk)
+      throws IOException {
+
+    long srcSeed = System.currentTimeMillis();
+    long dstSeed = srcSeed + rand.nextLong();
+    int bufferLen = 128;
+    short replFactor = 2;
+    Path srcData = new Path(sourceBase + filename);
+
+    // create data with 2 chunks: the 2nd chunk has half of the block size
+    long firstChunkLength = BLOCK_SIZE * blocksPerChunk;
+    long secondChunkLength = BLOCK_SIZE / 2;
+
+    DFSTestUtil.createFile(fs, srcData,
+        bufferLen, firstChunkLength, BLOCK_SIZE, replFactor,
+        srcSeed);
+    DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData,
+        (int) secondChunkLength);
+
+    DFSTestUtil.createFile(fs, new Path(targetBase
+            + filename + ".____distcpSplit____0."
+            + firstChunkLength), bufferLen,
+        firstChunkLength, BLOCK_SIZE, replFactor, dstSeed);
+    DFSTestUtil.createFile(fs, new Path(targetBase
+            + filename + ".____distcpSplit____"
+            + firstChunkLength + "." + secondChunkLength), bufferLen,
+        secondChunkLength, BLOCK_SIZE, replFactor, dstSeed);
+  }
+
   private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
     return new TaskAttemptContextImpl(conf,
         new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
index 311c1b3..2f610ab 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
@@ -26,10 +26,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.tools.ECAdmin;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
@@ -1119,6 +1121,71 @@ public class TestDistCpUtils {
     Assert.assertFalse(srcStatus.getReplication() == f2Status.getReplication());
   }
 
+  @Test
+  public void testCompareFileLengthsAndChecksums() throws IOException {
+
+    String base = "/tmp/verify-checksum/";
+    long srcSeed = System.currentTimeMillis();
+    long dstSeed = srcSeed + rand.nextLong();
+    short replFactor = 2;
+
+    FileSystem fs = FileSystem.get(config);
+    Path basePath = new Path(base);
+    fs.mkdirs(basePath);
+
+    // empty lengths comparison
+    Path srcWithLen0 = new Path(base + "srcLen0");
+    Path dstWithLen0 = new Path(base + "dstLen0");
+    fs.create(srcWithLen0).close();
+    fs.create(dstWithLen0).close();
+    DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0,
+        null, fs, dstWithLen0, false);
+
+    // different lengths comparison
+    Path srcWithLen1 = new Path(base + "srcLen1");
+    Path dstWithLen2 = new Path(base + "dstLen2");
+    DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed);
+    DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed);
+    try {
+      DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1,
+          null, fs, dstWithLen2, false);
+      Assert.fail("Expected different lengths comparison to fail!");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Mismatch in length", e);
+    }
+
+    // checksums matched
+    Path srcWithChecksum1 = new Path(base + "srcChecksum1");
+    Path dstWithChecksum1 = new Path(base + "dstChecksum1");
+    DFSTestUtil.createFile(fs, srcWithChecksum1, 1024,
+        replFactor, srcSeed);
+    DFSTestUtil.createFile(fs, dstWithChecksum1, 1024,
+        replFactor, srcSeed);
+    DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+        null, fs, dstWithChecksum1, false);
+    DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+        fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1,
+        false);
+
+    // checksums mismatched
+    Path dstWithChecksum2 = new Path(base + "dstChecksum2");
+    DFSTestUtil.createFile(fs, dstWithChecksum2, 1024,
+        replFactor, dstSeed);
+    try {
+      DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+          null, fs, dstWithChecksum2, false);
+      Assert.fail("Expected different checksums comparison to fail!");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Checksum mismatch", e);
+    }
+
+    // checksums mismatched but skipped
+    DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1,
+        null, fs, dstWithChecksum2, true);
+  }
+
   private static Random rand = new Random();
 
   public static String createTestSetup(FileSystem fs) throws IOException {
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java
new file mode 100644
index 0000000..5d44ab0
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Test length and checksums comparison with checksum combine mode.
+ * When the combine mode is COMPOSITE_CRC, it should tolerate different file
+ * systems and different block sizes.
+ */
+public class TestDistCpUtilsWithCombineMode {
+  private static final Logger LOG = LoggerFactory.getLogger(TestDistCpUtils.class);
+
+  private Configuration config;
+  private MiniDFSCluster cluster;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void create() throws IOException {
+    config = new Configuration();
+    if (testName.getMethodName().contains("WithCombineMode")) {
+      config.set("dfs.checksum.combine.mode", "COMPOSITE_CRC");
+    }
+    config.setLong(
+        DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 512);
+    cluster = new MiniDFSCluster.Builder(config)
+        .numDataNodes(2)
+        .format(true)
+        .build();
+  }
+
+  @After
+  public void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChecksumsComparisonWithCombineMode() throws IOException {
+    try {
+      compareSameContentButDiffBlockSizes();
+    } catch (IOException e) {
+      LOG.error("Unexpected exception is found", e);
+      throw e;
+    }
+  }
+
+  @Test
+  public void testChecksumsComparisonWithoutCombineMode() {
+    try {
+      compareSameContentButDiffBlockSizes();
+      Assert.fail("Expected comparison to fail");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Checksum mismatch", e);
+    }
+  }
+
+  private void compareSameContentButDiffBlockSizes() throws IOException {
+    String base = "/tmp/verify-checksum-" + testName.getMethodName() + "/";
+    long seed = System.currentTimeMillis();
+    short rf = 2;
+
+    FileSystem fs = FileSystem.get(config);
+    Path basePath = new Path(base);
+    fs.mkdirs(basePath);
+
+    // create 2 files of same content but different block-sizes
+    Path src = new Path(base + "src");
+    Path dst = new Path(base + "dst");
+    DFSTestUtil.createFile(fs, src, 256, 1024, 512,
+        rf, seed);
+    DFSTestUtil.createFile(fs, dst, 256, 1024, 1024,
+        rf, seed);
+    // then compare
+    DistCpUtils.compareFileLengthsAndChecksums(fs, src,
+        null, fs, dst, false);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org