You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:18 UTC

[11/27] hadoop git commit: HDFS-7535. Utilize Snapshot diff report for distcp. Contributed by Jing Zhao.

HDFS-7535. Utilize Snapshot diff report for distcp. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed70fa14
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed70fa14
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed70fa14

Branch: refs/heads/YARN-2928
Commit: ed70fa142cabdbc1065e4dbbc95e99c8850c4751
Parents: 03cc229
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Mar 4 10:30:53 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Mar 4 10:30:53 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/tools/CopyListing.java    |   4 +-
 .../java/org/apache/hadoop/tools/DiffInfo.java  |  90 +++++
 .../java/org/apache/hadoop/tools/DistCp.java    |  16 +-
 .../apache/hadoop/tools/DistCpConstants.java    |   3 +
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  12 +-
 .../org/apache/hadoop/tools/DistCpOptions.java  |  34 ++
 .../org/apache/hadoop/tools/DistCpSync.java     | 192 ++++++++++
 .../org/apache/hadoop/tools/OptionsParser.java  |  24 +-
 .../hadoop/tools/mapred/CopyCommitter.java      |   3 +-
 .../org/apache/hadoop/tools/TestDistCpSync.java | 349 +++++++++++++++++++
 .../apache/hadoop/tools/TestOptionsParser.java  |  75 +++-
 12 files changed, 790 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 62006d3..3c6d447 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -704,6 +704,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7789. DFSck should resolve the path to support cross-FS symlinks.
     (gera)
 
+    HDFS-7535. Utilize Snapshot diff report for distcp. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index a7b68a9..e3c58e9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -224,7 +224,9 @@ public abstract class CopyListing extends Configured {
                                            Credentials credentials,
                                            DistCpOptions options)
       throws IOException {
-
+    if (options.shouldUseDiff()) {
+      return new GlobbedCopyListing(configuration, credentials);
+    }
     String copyListingClassName = configuration.get(DistCpConstants.
         CONF_LABEL_COPY_LISTING_CLASS, "");
     Class<? extends CopyListing> copyListingClass;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java
new file mode 100644
index 0000000..b617de7
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+
+/**
+ * Information presenting a rename/delete op derived from a snapshot diff entry.
+ * This includes the source file/dir of the rename/delete op, and the target
+ * file/dir of a rename op.
+ */
+class DiffInfo {
+  static final Comparator<DiffInfo> sourceComparator = new Comparator<DiffInfo>() {
+    @Override
+    public int compare(DiffInfo d1, DiffInfo d2) {
+      return d2.source.compareTo(d1.source);
+    }
+  };
+
+  static final Comparator<DiffInfo> targetComparator = new Comparator<DiffInfo>() {
+    @Override
+    public int compare(DiffInfo d1, DiffInfo d2) {
+      return d1.target == null ? -1 :
+          (d2.target ==  null ? 1 : d1.target.compareTo(d2.target));
+    }
+  };
+
+  /** The source file/dir of the rename or deletion op */
+  final Path source;
+  /**
+   * The intermediate file/dir for the op. For a rename or a delete op,
+   * we first rename the source to this tmp file/dir.
+   */
+  private Path tmp;
+  /** The target file/dir of the rename op. Null means the op is deletion. */
+  final Path target;
+
+  DiffInfo(Path source, Path target) {
+    assert source != null;
+    this.source = source;
+    this.target= target;
+  }
+
+  void setTmp(Path tmp) {
+    this.tmp = tmp;
+  }
+
+  Path getTmp() {
+    return tmp;
+  }
+
+  static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) {
+    List<DiffInfo> diffs = new ArrayList<>();
+    for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
+      if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
+        final Path source = new Path(targetDir,
+            DFSUtil.bytes2String(entry.getSourcePath()));
+        diffs.add(new DiffInfo(source, null));
+      } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
+        final Path source = new Path(targetDir,
+            DFSUtil.bytes2String(entry.getSourcePath()));
+        final Path target = new Path(targetDir,
+            DFSUtil.bytes2String(entry.getTargetPath()));
+        diffs.add(new DiffInfo(source, target));
+      }
+    }
+    return diffs.toArray(new DiffInfo[diffs.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index b80aeb8..ada4b25 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.tools;
 
+import java.io.IOException;
+import java.util.Random;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,10 +30,10 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.tools.CopyListing.*;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@@ -39,9 +42,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import java.io.IOException;
-import java.util.Random;
-
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -62,7 +62,7 @@ public class DistCp extends Configured implements Tool {
    */
   static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
-  private static final Log LOG = LogFactory.getLog(DistCp.class);
+  static final Log LOG = LogFactory.getLog(DistCp.class);
 
   private DistCpOptions inputOptions;
   private Path metaFolder;
@@ -171,9 +171,13 @@ public class DistCp extends Configured implements Tool {
         //Don't cleanup while we are setting up.
         metaFolder = createMetaFolderPath();
         jobFS = metaFolder.getFileSystem(getConf());
-
         job = createJob();
       }
+      if (inputOptions.shouldUseDiff()) {
+        if (!DistCpSync.sync(inputOptions, getConf())) {
+          inputOptions.disableUsingDiff();
+        }
+      }
       createInputFileListing(job);
 
       job.submit();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 7e71096..a1af2af 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -53,6 +53,7 @@ public class DistCpConstants {
   public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
   public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
   public static final String CONF_LABEL_APPEND = "distcp.copy.append";
+  public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
   
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
@@ -134,4 +135,6 @@ public class DistCpConstants {
    * Value of reserved raw HDFS directory when copying raw.* xattrs.
    */
   static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
+
+  static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index 159d5ca..e9c7d46 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -41,8 +41,6 @@ public enum DistCpOptionSwitch {
    * target file. Note that when preserving checksum type, block size is also 
    * preserved.
    *
-   * @see PRESERVE_STATUS_DEFAULT
-   *
    * If any of the optional switches are present among rbugpcaxt, then
    * only the corresponding file attribute is preserved.
    */
@@ -149,6 +147,11 @@ public enum DistCpOptionSwitch {
       new Option("append", false,
           "Reuse existing data in target files and append new data to them if possible")),
 
+  DIFF(DistCpConstants.CONF_LABEL_DIFF,
+      new Option("diff", false,
+      "Use snapshot diff report to identify the difference between source and target"),
+      2),
+
   /**
    * Should DisctpExecution be blocking
    */
@@ -178,6 +181,11 @@ public enum DistCpOptionSwitch {
     this.option = option;
   }
 
+  DistCpOptionSwitch(String confLabel, Option option, int argNum) {
+    this(confLabel, option);
+    this.option.setArgs(argNum);
+  }
+
   /**
    * Get Configuration label for the option
    * @return configuration label name

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 57d2fb7..709e583 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -42,6 +42,7 @@ public class DistCpOptions {
   private boolean append = false;
   private boolean skipCRC = false;
   private boolean blocking = true;
+  private boolean useDiff = false;
 
   private int maxMaps = DistCpConstants.DEFAULT_MAPS;
   private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
@@ -61,6 +62,9 @@ public class DistCpOptions {
   private Path sourceFileListing;
   private List<Path> sourcePaths;
 
+  private String fromSnapshot;
+  private String toSnapshot;
+
   private Path targetPath;
 
   // targetPathExist is a derived field, it's initialized in the 
@@ -264,6 +268,29 @@ public class DistCpOptions {
     this.append = append;
   }
 
+  public boolean shouldUseDiff() {
+    return this.useDiff;
+  }
+
+  public String getFromSnapshot() {
+    return this.fromSnapshot;
+  }
+
+  public String getToSnapshot() {
+    return this.toSnapshot;
+  }
+
+  public void setUseDiff(boolean useDiff, String fromSnapshot, String toSnapshot) {
+    validate(DistCpOptionSwitch.DIFF, useDiff);
+    this.useDiff = useDiff;
+    this.fromSnapshot = fromSnapshot;
+    this.toSnapshot = toSnapshot;
+  }
+
+  public void disableUsingDiff() {
+    this.useDiff = false;
+  }
+
   /**
    * Should CRC/checksum check be skipped while checking files are identical
    *
@@ -508,6 +535,7 @@ public class DistCpOptions {
     boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
         value : this.skipCRC);
     boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
+    boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff);
 
     if (syncFolder && atomicCommit) {
       throw new IllegalArgumentException("Atomic commit can't be used with " +
@@ -536,6 +564,10 @@ public class DistCpOptions {
       throw new IllegalArgumentException(
           "Append is disallowed when skipping CRC");
     }
+    if ((!syncFolder || !deleteMissing) && useDiff) {
+      throw new IllegalArgumentException(
+          "Diff is valid only with update and delete options");
+    }
   }
 
   /**
@@ -556,6 +588,8 @@ public class DistCpOptions {
         String.valueOf(overwrite));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
         String.valueOf(append));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
+        String.valueOf(useDiff));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
         String.valueOf(skipCRC));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
new file mode 100644
index 0000000..26d7eb4
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This class provides the basic functionality to sync two FileSystems based on
+ * the snapshot diff report. More specifically, we have the following settings:
+ * 1. Both the source and target FileSystem must be DistributedFileSystem
+ * 2. Two snapshots (e.g., s1 and s2) have been created on the source FS.
+ * The diff between these two snapshots will be copied to the target FS.
+ * 3. The target has the same snapshot s1. No changes have been made on the
+ * target since s1. All the files/directories in the target are the same with
+ * source.s1
+ */
+class DistCpSync {
+
+  static boolean sync(DistCpOptions inputOptions, Configuration conf)
+      throws IOException {
+    List<Path> sourcePaths = inputOptions.getSourcePaths();
+    if (sourcePaths.size() != 1) {
+      // we only support one source dir which must be a snapshottable directory
+      DistCp.LOG.warn(sourcePaths.size() + " source paths are provided");
+      return false;
+    }
+    final Path sourceDir = sourcePaths.get(0);
+    final Path targetDir = inputOptions.getTargetPath();
+
+    final FileSystem sfs = sourceDir.getFileSystem(conf);
+    final FileSystem tfs = targetDir.getFileSystem(conf);
+    // currently we require both the source and the target file system are
+    // DistributedFileSystem.
+    if (!(sfs instanceof DistributedFileSystem) ||
+        !(tfs instanceof DistributedFileSystem)) {
+      DistCp.LOG.warn("To use diff-based distcp, the FileSystems needs to" +
+          " be DistributedFileSystem");
+      return false;
+    }
+    final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs;
+    final DistributedFileSystem targetFs= (DistributedFileSystem) tfs;
+
+    // make sure targetFS has no change between from and the current states
+    if (!checkNoChange(inputOptions, targetFs, targetDir)) {
+      return false;
+    }
+
+    Path tmpDir = null;
+    try {
+      tmpDir = createTargetTmpDir(targetFs, targetDir);
+      DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir);
+      if (diffs == null) {
+        return false;
+      }
+      // do the real sync work: deletion and rename
+      syncDiff(diffs, targetFs, tmpDir);
+      return true;
+    } catch (Exception e) {
+      DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
+      return false;
+    } finally {
+      deleteTargetTmpDir(targetFs, tmpDir);
+      // TODO: since we have tmp directory, we can support "undo" with failures
+    }
+  }
+
+  private static Path createTargetTmpDir(DistributedFileSystem targetFs,
+      Path targetDir) throws IOException {
+    final Path tmp = new Path(targetDir,
+        DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
+    if (!targetFs.mkdirs(tmp)) {
+      throw new IOException("The tmp directory " + tmp + " already exists");
+    }
+    return tmp;
+  }
+
+  private static void deleteTargetTmpDir(DistributedFileSystem targetFs,
+      Path tmpDir) {
+    try {
+      if (tmpDir != null) {
+        targetFs.delete(tmpDir, true);
+      }
+    } catch (IOException e) {
+      DistCp.LOG.error("Unable to cleanup tmp dir: " + tmpDir, e);
+    }
+  }
+
+  /**
+   * Compute the snapshot diff on the given file system. Return true if the diff
+   * is empty, i.e., no changes have happened in the FS.
+   */
+  private static boolean checkNoChange(DistCpOptions inputOptions,
+      DistributedFileSystem fs, Path path) {
+    try {
+      SnapshotDiffReport targetDiff =
+          fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
+      if (!targetDiff.getDiffList().isEmpty()) {
+        DistCp.LOG.warn("The target has been modified since snapshot "
+            + inputOptions.getFromSnapshot());
+        return false;
+      } else {
+        return true;
+      }
+    } catch (IOException e) {
+      DistCp.LOG.warn("Failed to compute snapshot diff on " + path, e);
+    }
+    return false;
+  }
+
+  @VisibleForTesting
+  static DiffInfo[] getDiffs(DistCpOptions inputOptions,
+      DistributedFileSystem fs, Path sourceDir, Path targetDir) {
+    try {
+      SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
+          inputOptions.getFromSnapshot(), inputOptions.getToSnapshot());
+      return DiffInfo.getDiffs(sourceDiff, targetDir);
+    } catch (IOException e) {
+      DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
+    }
+    return null;
+  }
+
+  private static void syncDiff(DiffInfo[] diffs,
+      DistributedFileSystem targetFs, Path tmpDir) throws IOException {
+    moveToTmpDir(diffs, targetFs, tmpDir);
+    moveToTarget(diffs, targetFs);
+  }
+
+  /**
+   * Move all the source files that should be renamed or deleted to the tmp
+   * directory.
+   */
+  private static void moveToTmpDir(DiffInfo[] diffs,
+      DistributedFileSystem targetFs, Path tmpDir) throws IOException {
+    // sort the diffs based on their source paths to make sure the files and
+    // subdirs are moved before moving their parents/ancestors.
+    Arrays.sort(diffs, DiffInfo.sourceComparator);
+    Random random = new Random();
+    for (DiffInfo diff : diffs) {
+      Path tmpTarget = new Path(tmpDir, diff.source.getName());
+      while (targetFs.exists(tmpTarget)) {
+        tmpTarget = new Path(tmpDir, diff.source.getName() + random.nextInt());
+      }
+      diff.setTmp(tmpTarget);
+      targetFs.rename(diff.source, tmpTarget);
+    }
+  }
+
+  /**
+   * Finish the rename operations: move all the intermediate files/directories
+   * from the tmp dir to the final targets.
+   */
+  private static void moveToTarget(DiffInfo[] diffs,
+      DistributedFileSystem targetFs) throws IOException {
+    // sort the diffs based on their target paths to make sure the parent
+    // directories are created first.
+    Arrays.sort(diffs, DiffInfo.targetComparator);
+    for (DiffInfo diff : diffs) {
+      if (diff.target != null) {
+        if (!targetFs.exists(diff.target.getParent())) {
+          targetFs.mkdirs(diff.target.getParent());
+        }
+        targetFs.rename(diff.getTmp(), diff.target);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 525136c..a3a76ef 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -18,13 +18,22 @@
 
 package org.apache.hadoop.tools;
 
-import org.apache.commons.cli.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 
-import java.util.*;
+import com.google.common.base.Preconditions;
 
 /**
  * The OptionsParser parses out the command-line options passed to DistCp,
@@ -207,6 +216,13 @@ public class OptionsParser {
       }
     }
 
+    if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
+      String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
+      Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
+          "Must provide both the starting and ending snapshot names");
+      option.setUseDiff(true, snapshots[0], snapshots[1]);
+    }
+
     if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
       String fileLimitString = getVal(command,
                               DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
@@ -247,6 +263,10 @@ public class OptionsParser {
     }
   }
 
+  private static String[] getVals(CommandLine command, String option) {
+    return command.getOptionValues(option);
+  }
+
   public static void usage() {
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index d5fdd7f..f36ef77 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -90,7 +90,8 @@ public class CopyCommitter extends FileOutputCommitter {
     }
 
     try {
-      if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+      if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)
+          && !(conf.getBoolean(DistCpConstants.CONF_LABEL_DIFF, false))) {
         deleteMissing(conf);
       } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
         commitData(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
new file mode 100644
index 0000000..7d5dad0
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestDistCpSync {
+  private MiniDFSCluster cluster;
+  private final Configuration conf = new HdfsConfiguration();
+  private DistributedFileSystem dfs;
+  private DistCpOptions options;
+  private final Path source = new Path("/source");
+  private final Path target = new Path("/target");
+  private final long BLOCK_SIZE = 1024;
+  private final short DATA_NUM = 1;
+
+  @Before
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
+    cluster.waitActive();
+
+    dfs = cluster.getFileSystem();
+    dfs.mkdirs(source);
+    dfs.mkdirs(target);
+
+    options = new DistCpOptions(Arrays.asList(source), target);
+    options.setSyncFolder(true);
+    options.setDeleteMissing(true);
+    options.setUseDiff(true, "s1", "s2");
+    options.appendToConf(conf);
+
+    conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
+    conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    IOUtils.cleanup(null, dfs);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test the sync returns false in the following scenarios:
+   * 1. the source/target dir are not snapshottable dir
+   * 2. the source/target does not have the given snapshots
+   * 3. changes have been made in target
+   */
+  @Test
+  public void testFallback() throws Exception {
+    // the source/target dir are not snapshottable dir
+    Assert.assertFalse(DistCpSync.sync(options, conf));
+
+    // the source/target does not have the given snapshots
+    dfs.allowSnapshot(source);
+    dfs.allowSnapshot(target);
+    Assert.assertFalse(DistCpSync.sync(options, conf));
+
+    dfs.createSnapshot(source, "s1");
+    dfs.createSnapshot(source, "s2");
+    dfs.createSnapshot(target, "s1");
+    Assert.assertTrue(DistCpSync.sync(options, conf));
+
+    // changes have been made in target
+    final Path subTarget = new Path(target, "sub");
+    dfs.mkdirs(subTarget);
+    Assert.assertFalse(DistCpSync.sync(options, conf));
+
+    dfs.delete(subTarget, true);
+    Assert.assertTrue(DistCpSync.sync(options, conf));
+  }
+
+  /**
+   * create some files and directories under the given directory.
+   * the final subtree looks like this:
+   *                     dir/
+   *              foo/          bar/
+   *           d1/    f1     d2/    f2
+   *         f3            f4
+   */
+  private void initData(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path d1 = new Path(foo, "d1");
+    final Path f1 = new Path(foo, "f1");
+    final Path d2 = new Path(bar, "d2");
+    final Path f2 = new Path(bar, "f2");
+    final Path f3 = new Path(d1, "f3");
+    final Path f4 = new Path(d2, "f4");
+
+    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0);
+  }
+
+  /**
+   * make some changes under the given directory (created in the above way).
+   * 1. rename dir/foo/d1 to dir/bar/d1
+   * 2. delete dir/bar/d1/f3
+   * 3. rename dir/foo to /dir/bar/d1/foo
+   * 4. delete dir/bar/d1/foo/f1
+   * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE
+   * 6. append one BLOCK to file dir/bar/f2
+   * 7. rename dir/bar to dir/foo
+   *
+   * Thus after all these ops the subtree looks like this:
+   *                       dir/
+   *                       foo/
+   *                 d1/    f2(A)    d2/
+   *                foo/             f4
+   *                f1(new)
+   */
+  private void changeData(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path d1 = new Path(foo, "d1");
+    final Path f2 = new Path(bar, "f2");
+
+    final Path bar_d1 = new Path(bar, "d1");
+    dfs.rename(d1, bar_d1);
+    final Path f3 = new Path(bar_d1, "f3");
+    dfs.delete(f3, true);
+    final Path newfoo = new Path(bar_d1, "foo");
+    dfs.rename(foo, newfoo);
+    final Path f1 = new Path(newfoo, "f1");
+    dfs.delete(f1, true);
+    DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
+    dfs.rename(bar, new Path(dir, "foo"));
+  }
+
+  /**
+   * Test the basic functionality.
+   */
+  @Test
+  public void testSync() throws Exception {
+    initData(source);
+    initData(target);
+    dfs.allowSnapshot(source);
+    dfs.allowSnapshot(target);
+    dfs.createSnapshot(source, "s1");
+    dfs.createSnapshot(target, "s1");
+
+    // make changes under source
+    changeData(source);
+    dfs.createSnapshot(source, "s2");
+
+    // do the sync
+    Assert.assertTrue(DistCpSync.sync(options, conf));
+
+    // build copy listing
+    final Path listingPath = new Path("/tmp/META/fileList.seq");
+    CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
+    listing.buildListing(listingPath, options);
+
+    Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(conf, null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        stubContext.getContext();
+    // Enable append
+    context.getConfiguration().setBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+    copyMapper.setup(context);
+    for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing.entrySet()) {
+      copyMapper.map(entry.getKey(), entry.getValue(), context);
+    }
+
+    // verify that we only copied new appended data of f2 and the new file f1
+    Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
+        .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+
+    // verify the source and target now has the same structure
+    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+  }
+
+  private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
+      throws Exception {
+    SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(listingPath));
+    Text key = new Text();
+    CopyListingFileStatus value = new CopyListingFileStatus();
+    Map<Text, CopyListingFileStatus> values = new HashMap<>();
+    while (reader.next(key, value)) {
+      values.put(key, value);
+      key = new Text();
+      value = new CopyListingFileStatus();
+    }
+    return values;
+  }
+
+  private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
+      throws Exception {
+    Assert.assertEquals(s.isDirectory(), t.isDirectory());
+    if (compareName) {
+      Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
+    }
+    if (!s.isDirectory()) {
+      // verify the file content is the same
+      byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
+      byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
+      Assert.assertArrayEquals(sbytes, tbytes);
+    } else {
+      FileStatus[] slist = dfs.listStatus(s.getPath());
+      FileStatus[] tlist = dfs.listStatus(t.getPath());
+      Assert.assertEquals(slist.length, tlist.length);
+      for (int i = 0; i < slist.length; i++) {
+        verifyCopy(slist[i], tlist[i], true);
+      }
+    }
+  }
+
+  private void initData2(Path dir) throws Exception {
+    final Path test = new Path(dir, "test");
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path f1 = new Path(test, "f1");
+    final Path f2 = new Path(foo, "f2");
+    final Path f3 = new Path(bar, "f3");
+
+    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 1L);
+    DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 2L);
+  }
+
+  private void changeData2(Path dir) throws Exception {
+    final Path tmpFoo = new Path(dir, "tmpFoo");
+    final Path test = new Path(dir, "test");
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+
+    dfs.rename(test, tmpFoo);
+    dfs.rename(foo, test);
+    dfs.rename(bar, foo);
+    dfs.rename(tmpFoo, bar);
+  }
+
+  @Test
+  public void testSync2() throws Exception {
+    initData2(source);
+    initData2(target);
+    dfs.allowSnapshot(source);
+    dfs.allowSnapshot(target);
+    dfs.createSnapshot(source, "s1");
+    dfs.createSnapshot(target, "s1");
+
+    // make changes under source
+    changeData2(source);
+    dfs.createSnapshot(source, "s2");
+
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    // do the sync
+    Assert.assertTrue(DistCpSync.sync(options, conf));
+    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+  }
+
+  private void initData3(Path dir) throws Exception {
+    final Path test = new Path(dir, "test");
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path f1 = new Path(test, "file");
+    final Path f2 = new Path(foo, "file");
+    final Path f3 = new Path(bar, "file");
+
+    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE * 2, DATA_NUM, 1L);
+    DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE * 3, DATA_NUM, 2L);
+  }
+
+  private void changeData3(Path dir) throws Exception {
+    final Path test = new Path(dir, "test");
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path f1 = new Path(test, "file");
+    final Path f2 = new Path(foo, "file");
+    final Path f3 = new Path(bar, "file");
+    final Path newf1 = new Path(test, "newfile");
+    final Path newf2 = new Path(foo, "newfile");
+    final Path newf3 = new Path(bar, "newfile");
+
+    dfs.rename(f1, newf1);
+    dfs.rename(f2, newf2);
+    dfs.rename(f3, newf3);
+  }
+
+  /**
+   * Test a case where there are multiple source files with the same name
+   */
+  @Test
+  public void testSync3() throws Exception {
+    initData3(source);
+    initData3(target);
+    dfs.allowSnapshot(source);
+    dfs.allowSnapshot(target);
+    dfs.createSnapshot(source, "s1");
+    dfs.createSnapshot(target, "s1");
+
+    // make changes under source
+    changeData3(source);
+    dfs.createSnapshot(source, "s2");
+
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    // do the sync
+    Assert.assertTrue(DistCpSync.sync(options, conf));
+    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed70fa14/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index 30fb25b..cc9da33 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -584,7 +584,7 @@ public class TestOptionsParser {
 
     // make sure -append is only valid when -update is specified
     try {
-      options = OptionsParser.parse(new String[] { "-append",
+      OptionsParser.parse(new String[] { "-append",
               "hdfs://localhost:8020/source/first",
               "hdfs://localhost:8020/target/" });
       fail("Append should fail if update option is not specified");
@@ -595,7 +595,7 @@ public class TestOptionsParser {
 
     // make sure -append is invalid when skipCrc is specified
     try {
-      options = OptionsParser.parse(new String[] {
+      OptionsParser.parse(new String[] {
           "-append", "-update", "-skipcrccheck",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target/" });
@@ -605,4 +605,75 @@ public class TestOptionsParser {
           "Append is disallowed when skipping CRC", e);
     }
   }
+
+  @Test
+  public void testDiffOption() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
+        false));
+
+    DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+        "-delete", "-diff", "s1", "s2",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/" });
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), false));
+    Assert.assertTrue(options.shouldUseDiff());
+    Assert.assertEquals("s1", options.getFromSnapshot());
+    Assert.assertEquals("s2", options.getToSnapshot());
+
+    options = OptionsParser.parse(new String[] {
+        "-delete", "-diff", "s1", ".", "-update",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/" });
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
+        false));
+    Assert.assertTrue(options.shouldUseDiff());
+    Assert.assertEquals("s1", options.getFromSnapshot());
+    Assert.assertEquals(".", options.getToSnapshot());
+
+    // -diff requires two option values
+    try {
+      OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("-diff should fail with only one snapshot name");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Must provide both the starting and ending snapshot names", e);
+    }
+
+    // make sure -diff is only valid when -update and -delete is specified
+    try {
+      OptionsParser.parse(new String[] { "-diff", "s1", "s2",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("-diff should fail if -update or -delete option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Diff is valid only with update and delete options", e);
+    }
+
+    try {
+      OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("-diff should fail if -update or -delete option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Diff is valid only with update and delete options", e);
+    }
+
+    try {
+      OptionsParser.parse(new String[] { "-diff", "s1", "s2",
+          "-delete", "-overwrite",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("-diff should fail if -update or -delete option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Diff is valid only with update and delete options", e);
+    }
+  }
 }