You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by me...@apache.org on 2023/02/14 09:47:38 UTC

[hadoop] branch branch-3.3 updated: HADOOP-18596. Distcp -update to use modification time while checking for file skip. (#5387)

This is an automated email from the ASF dual-hosted git repository.

mehakmeet pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a2ceb093235 HADOOP-18596. Distcp -update to use modification time while checking for file skip. (#5387)
a2ceb093235 is described below

commit a2ceb093235e249a1da2733edcf4004af3fc139b
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Feb 14 15:17:27 2023 +0530

    HADOOP-18596. Distcp -update to use modification time while checking for file skip. (#5387)
    
    
    Adding toggleable support for modification time during distcp -update between two stores with incompatible checksum comparison.
    
    Contributed by: Mehakmeet Singh <me...@gmail.com>
---
 .../org/apache/hadoop/tools/DistCpConstants.java   |  20 +++
 .../org/apache/hadoop/tools/mapred/CopyMapper.java |  77 ++++++++++-
 .../org/apache/hadoop/tools/util/DistCpUtils.java  |  30 +++--
 .../hadoop-distcp/src/site/markdown/DistCp.md.vm   |  39 ++++--
 .../tools/contract/AbstractContractDistCpTest.java | 145 +++++++++++++++++++++
 .../hadoop/tools/mapred/TestCopyCommitter.java     |   9 +-
 6 files changed, 296 insertions(+), 24 deletions(-)

diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index c75c0e85dd7..0291d949a18 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -140,6 +140,26 @@ public final class DistCpConstants {
       "distcp.blocks.per.chunk";
 
   public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
+
+  /**
+   * Enabling {@code distcp -update} to use modification time of source and
+   * target file to check while copying same file with same size but
+   * different content.
+   *
+   * The check would verify if the target file is perceived as older than the
+   * source then it indicates that the source has been recently updated and it
+   * is a newer version than what was synced, so we should not skip the copy.
+   * {@value}
+   */
+  public static final String CONF_LABEL_UPDATE_MOD_TIME =
+      "distcp.update.modification.time";
+
+  /**
+   * Default value for 'distcp.update.modification.time' configuration.
+   */
+  public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT =
+      true;
+
   /**
    * Constants for DistCp return code to shell / consumer of ToolRunner's run
    */
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index f3c5b4ba7ae..d0b34f0db39 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT;
+
 /**
  * Mapper class that executes the DistCp copy operation.
  * Implements the o.a.h.mapreduce.Mapper interface.
@@ -74,6 +76,15 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     OVERWRITE,    // Overwrite the whole file
   }
 
+  /**
+   * Indicates the checksum comparison result.
+   */
+  public enum ChecksumComparison {
+    TRUE,           // checksum comparison is compatible and true.
+    FALSE,          // checksum comparison is compatible and false.
+    INCOMPATIBLE,   // checksum comparison is not compatible.
+  }
+
   private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
 
   private Configuration conf;
@@ -85,6 +96,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private boolean append = false;
   private boolean verboseLog = false;
   private boolean directWrite = false;
+  private boolean useModTimeToUpdate;
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
   private FileSystem targetFS = null;
@@ -114,6 +126,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         PRESERVE_STATUS.getConfigLabel()));
     directWrite = conf.getBoolean(
         DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
+    useModTimeToUpdate =
+        conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
+            CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);
 
     targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
     Path targetFinalPath = new Path(conf.get(
@@ -350,13 +365,65 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     boolean sameLength = target.getLen() == source.getLen();
     boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
         || !preserve.contains(FileAttribute.BLOCKSIZE);
+    // Skip the copy if a 0 size file is being copied.
+    if (sameLength && source.getLen() == 0) {
+      return true;
+    }
+    // If the src and target file have same size and block size, we would
+    // check if the checkCrc flag is enabled or not. If enabled, and the
+    // modTime comparison is enabled then return true if target file is older
+    // than the source file, since this indicates that the target file is
+    // recently updated and the source is not changed more recently than the
+    // update, we can skip the copy else we would copy.
+    // If skipCrc flag is disabled, we would check the checksum comparison
+    // which is an enum representing 3 values, of which if the comparison
+    // returns NOT_COMPATIBLE, we'll try to check modtime again, else return
+    // the result of checksum comparison which are compatible(true or false).
+    //
+    // Note: Different object stores can have different checksum algorithms
+    // resulting in no checksum comparison that results in return true
+    // always, having the modification time enabled can help in these
+    // scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.
+
     if (sameLength && sameBlockSize) {
-      return skipCrc ||
-          DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
-              targetFS, target.getPath(), source.getLen());
-    } else {
-      return false;
+      if (skipCrc) {
+        return maybeUseModTimeToCompare(source, target);
+      } else {
+        ChecksumComparison checksumComparison = DistCpUtils
+            .checksumsAreEqual(sourceFS, source.getPath(), null,
+                targetFS, target.getPath(), source.getLen());
+        LOG.debug("Result of checksum comparison between src {} and target "
+            + "{} : {}", source, target, checksumComparison);
+        if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) {
+          return maybeUseModTimeToCompare(source, target);
+        }
+        // if skipCrc is disabled and checksumComparison is compatible we
+        // need not check the mod time.
+        return checksumComparison.equals(ChecksumComparison.TRUE);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * If the mod time comparison is enabled, check the mod time else return
+   * false.
+   * Comparison: If the target file perceives to have greater or equal mod time
+   * (older) than the source file, we can assume that there has been no new
+   * changes that occurred in the source file, hence we should return true to
+   * skip the copy of the file.
+   *
+   * @param source Source fileStatus.
+   * @param target Target fileStatus.
+   * @return boolean representing result of modTime check.
+   */
+  private boolean maybeUseModTimeToCompare(
+      CopyListingFileStatus source, FileStatus target) {
+    if (useModTimeToUpdate) {
+      return source.getModificationTime() <= target.getModificationTime();
     }
+    // if we cannot check mod time, return true (skip the copy).
+    return true;
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index 1af434e19f8..e77b2031a76 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpContext;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
@@ -568,10 +569,12 @@ public class DistCpUtils {
    * and false otherwise.
    * @throws IOException if there's an exception while retrieving checksums.
    */
-  public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
-                                          FileChecksum sourceChecksum,
-                                          FileSystem targetFS,
-                                          Path target, long sourceLen)
+  public static CopyMapper.ChecksumComparison checksumsAreEqual(
+      FileSystem sourceFS,
+      Path source,
+      FileChecksum sourceChecksum,
+      FileSystem targetFS,
+      Path target, long sourceLen)
       throws IOException {
     FileChecksum targetChecksum = null;
     try {
@@ -585,8 +588,15 @@ public class DistCpUtils {
     } catch (IOException e) {
       LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
     }
-    return (sourceChecksum == null || targetChecksum == null ||
-            sourceChecksum.equals(targetChecksum));
+    // If the source or target checksum is null, that means there is no
+    // comparison that took place and return not compatible.
+    // else if matched, return compatible with the matched result.
+    if (sourceChecksum == null || targetChecksum == null) {
+      return CopyMapper.ChecksumComparison.INCOMPATIBLE;
+    } else if (sourceChecksum.equals(targetChecksum)) {
+      return CopyMapper.ChecksumComparison.TRUE;
+    }
+    return CopyMapper.ChecksumComparison.FALSE;
   }
 
   /**
@@ -613,8 +623,12 @@ public class DistCpUtils {
 
     //At this point, src & dest lengths are same. if length==0, we skip checksum
     if ((srcLen != 0) && (!skipCrc)) {
-      if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
-          targetFS, target, srcLen)) {
+      CopyMapper.ChecksumComparison
+          checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum,
+              targetFS, target, srcLen);
+      // If Checksum comparison is false set it to false, else set to true.
+      boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE);
+      if (!checksumResult) {
         StringBuilder errorMessage =
             new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
                 .append(source).append(" and ").append(target).append(".");
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index 560ec55d2b2..73f776c2bff 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -630,14 +630,37 @@ hadoop distcp -update -numListstatusThreads 20  \
 Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation
 on a large directory tree (the limit is 40 threads).
 
-When `DistCp -update` is used with object stores,
-generally only the modification time and length of the individual files are compared,
-not any checksums. The fact that most object stores do have valid timestamps
-for directories is irrelevant; only the file timestamps are compared.
-However, it is important to have the clock of the client computers close
-to that of the infrastructure, so that timestamps are consistent between
-the client/HDFS cluster and that of the object store. Otherwise, changed files may be
-missed/copied too often.
+When `DistCp -update` is used with object stores, generally only the
+modification time and length of the individual files are compared, not any
+checksums if the checksum algorithm between the two stores is different.
+
+* The `distcp -update` between two object stores with different checksum
+  algorithm compares the modification times of source and target files along
+  with the file size to determine whether to skip the file copy. The behavior
+  is controlled by the property `distcp.update.modification.time`, which is
+  set to true by default. If the source file is more recently modified than
+  the target file, it is assumed that the content has changed, and the file
+  should be updated.
+  We need to ensure that there is no clock skew between the machines.
+  The fact that most object stores do have valid timestamps for directories
+  is irrelevant; only the file timestamps are compared. However, it is
+  important to have the clock of the client computers close to that of the
+  infrastructure, so that timestamps are consistent between the client/HDFS
+  cluster and that of the object store. Otherwise, changed files may be
+  missed/copied too often.
+
+* `distcp.update.modification.time` would only be used if either of the two
+  stores don't have checksum validation resulting in incompatible checksum
+  comparison between the two. Even if the property is set to true, it won't
+  be used if their is valid checksum comparison between the two stores.
+
+To turn off the modification time check, set this in your core-site.xml
+```xml
+<property>
+        <name>distcp.update.modification.time</name>
+        <value>false</value>
+</property>
+```
 
 **Notes**
 
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index 8545df30bac..532abc2aa40 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -72,6 +73,9 @@ public abstract class AbstractContractDistCpTest
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractContractDistCpTest.class);
 
+  /** Using offset to change modification time in tests. */
+  private static final long MODIFICATION_TIME_OFFSET = 10000;
+
   public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
       = "scale.test.distcp.file.size.kb";
 
@@ -354,6 +358,29 @@ public abstract class AbstractContractDistCpTest
             .withOverwrite(false)));
   }
 
+  /**
+   * Run distcp -update srcDir destDir.
+   * @param srcDir local source directory
+   * @param destDir remote destination directory.
+   * @return the completed job
+   * @throws Exception any failure.
+   */
+  private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
+      FileSystem sourceFs, FileSystem targetFs)
+      throws Exception {
+    describe("\nDistcp -update from " + srcDir + " to " + destDir);
+    lsR("Source Fs to update", sourceFs, srcDir);
+    lsR("Target Fs before update", targetFs, destDir);
+    return runDistCp(buildWithStandardOptions(
+        new DistCpOptions.Builder(
+            Collections.singletonList(srcDir), destDir)
+            .withDeleteMissing(true)
+            .withSyncFolder(true)
+            .withSkipCRC(false)
+            .withDirectWrite(shouldUseDirectWrite())
+            .withOverwrite(false)));
+  }
+
   /**
    * Update the source directories as various tests expect,
    * including adding a new file.
@@ -857,4 +884,122 @@ public abstract class AbstractContractDistCpTest
     verifyFileContents(localFS, dest, block);
   }
 
+  @Test
+  public void testDistCpUpdateCheckFileSkip() throws Exception {
+    describe("Distcp update to check file skips.");
+
+    Path source = new Path(remoteDir, "file");
+    Path dest = new Path(localDir, "file");
+
+    Path source0byte = new Path(remoteDir, "file_0byte");
+    Path dest0byte = new Path(localDir, "file_0byte");
+    dest = localFS.makeQualified(dest);
+    dest0byte = localFS.makeQualified(dest0byte);
+
+    // Creating a source file with certain dataset.
+    byte[] sourceBlock = dataset(10, 'a', 'z');
+
+    // Write the dataset and as well create the target path.
+    ContractTestUtils.createFile(localFS, dest, true, sourceBlock);
+    ContractTestUtils
+        .writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
+            1024, true);
+
+    // Create 0 byte source and target files.
+    ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
+    ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);
+
+    // Execute the distcp -update job.
+    Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);
+
+    // First distcp -update would normally copy the source to dest.
+    verifyFileContents(localFS, dest, sourceBlock);
+    // Verify 1 file was skipped in the distcp -update (The 0 byte file).
+    // Verify 1 file was copied in the distcp -update (The new source file).
+    verifySkipAndCopyCounter(job, 1, 1);
+
+    // Remove the source file and replace with a file with same name and size
+    // but different content.
+    remoteFS.delete(source, false);
+    Path updatedSource = new Path(remoteDir, "file");
+    byte[] updatedSourceBlock = dataset(10, 'b', 'z');
+    ContractTestUtils.writeDataset(remoteFS, updatedSource,
+        updatedSourceBlock, updatedSourceBlock.length, 1024, true);
+
+    // For testing purposes we would take the modification time of the
+    // updated Source file and add an offset or subtract the offset and set
+    // that time as the modification time for target file, this way we can
+    // ensure that our test can emulate a scenario where source is either more
+    // recently changed after -update so that copy takes place or target file
+    // is more recently changed which would skip the copying since the source
+    // has not been recently updated.
+    FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
+    long modTimeSourceUpd = fsSourceUpd.getModificationTime();
+
+    // Add by an offset which would ensure enough gap for the test to
+    // not fail due to race conditions.
+    long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
+    localFS.setTimes(dest, newTargetModTimeNew, -1);
+
+    // Execute the distcp -update job.
+    Job updatedSourceJobOldSrc =
+        distCpUpdateWithFs(remoteDir, localDir, remoteFS,
+            localFS);
+
+    // File contents should remain same since the mod time for target is
+    // newer than the updatedSource which indicates that the sync happened
+    // more recently and there is no update.
+    verifyFileContents(localFS, dest, sourceBlock);
+    // Skipped both 0 byte file and sourceFile (since mod time of target is
+    // older than the source it is perceived that source is of older version
+    // and we can skip it's copy).
+    verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);
+
+    // Subtract by an offset which would ensure enough gap for the test to
+    // not fail due to race conditions.
+    long newTargetModTimeOld =
+        Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
+    localFS.setTimes(dest, newTargetModTimeOld, -1);
+
+    // Execute the distcp -update job.
+    Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
+        remoteFS,
+        localFS);
+
+    // Verifying the target directory have both 0 byte file and the content
+    // file.
+    Assertions
+        .assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
+        .hasSize(2);
+    // Now the copy should take place and the file contents should change
+    // since the mod time for target is older than the source file indicating
+    // that there was an update to the source after the last sync took place.
+    verifyFileContents(localFS, dest, updatedSourceBlock);
+    // Verifying we skipped the 0 byte file and copied the updated source
+    // file (since the modification time of the new source is older than the
+    // target now).
+    verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
+  }
+
+  /**
+   * Method to check the skipped and copied counters of a distcp job.
+   *
+   * @param job               job to check.
+   * @param skipExpectedValue expected skip counter value.
+   * @param copyExpectedValue expected copy counter value.
+   * @throws IOException throw in case of failures.
+   */
+  private void verifySkipAndCopyCounter(Job job,
+      int skipExpectedValue, int copyExpectedValue) throws IOException {
+    // get the skip and copy counters from the job.
+    long skipActualValue = job.getCounters()
+        .findCounter(CopyMapper.Counter.SKIP).getValue();
+    long copyActualValue = job.getCounters()
+        .findCounter(CopyMapper.Counter.COPY).getValue();
+    // Verify if the actual values equals the expected ones.
+    assertEquals("Mismatch in COPY counter value", copyExpectedValue,
+        copyActualValue);
+    assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
+        skipActualValue);
+  }
 }
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index bda80a3d25e..f2dd246db5a 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -563,9 +563,12 @@ public class TestCopyCommitter {
         Path sourcePath = new Path(sourceBase + srcFilename);
         CopyListingFileStatus sourceCurrStatus =
                 new CopyListingFileStatus(fs.getFileStatus(sourcePath));
-        Assert.assertFalse(DistCpUtils.checksumsAreEqual(
-            fs, new Path(sourceBase + srcFilename), null,
-            fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()));
+        Assert.assertEquals("Checksum should not be equal",
+            CopyMapper.ChecksumComparison.FALSE,
+            DistCpUtils.checksumsAreEqual(
+                fs, new Path(sourceBase + srcFilename), null,
+                fs, new Path(targetBase + srcFilename),
+                sourceCurrStatus.getLen()));
       } catch(IOException exception) {
         if (skipCrc) {
           LOG.error("Unexpected exception is found", exception);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org