You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/04/29 13:50:00 UTC

[jira] [Work logged] (HIVE-24852) Add support for Snapshots during external table replication

     [ https://issues.apache.org/jira/browse/HIVE-24852?focusedWorklogId=590998&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-590998 ]

ASF GitHub Bot logged work on HIVE-24852:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Apr/21 13:49
            Start Date: 29/Apr/21 13:49
    Worklog Time Spent: 10m 
      Work Description: aasha commented on a change in pull request #2043:
URL: https://github.com/apache/hive/pull/2043#discussion_r619969098



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
##########
@@ -544,12 +544,13 @@ public static void assertExternalFileList(List<String> expected, String dumploca
     Set<String> tableNames = new HashSet<>();
     for (String line = reader.readLine(); line != null; line = reader.readLine()) {
       String[] components = line.split(DirCopyWork.URI_SEPARATOR);
-      Assert.assertEquals("The file should have sourcelocation#targetlocation#tblName",
-              3, components.length);
+      Assert.assertEquals("The file should have sourcelocation#targetlocation#tblName#copymode", 5,

Review comment:
       do we need to write copymode for each table location?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -342,7 +343,12 @@ public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) {
 
   public static int handleException(boolean isReplication, Throwable e, String nonRecoverablePath,
                                     ReplicationMetricCollector metricCollector, String stageName, HiveConf conf){
-    int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    int errorCode;
+    if (isReplication && e instanceof SnapshotException) {
+      errorCode = ErrorMsg.getErrorMsg("SNAPSHOT_ERROR").getErrorCode();

Review comment:
       why is this different? Can't we compare based on error msg

##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -691,6 +691,24 @@ public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
     return copied;
   }
 
+  public static boolean distCpWithSnapshot(String snap1, String snap2, List<Path> srcPaths, Path dst,
+      UserGroupInformation proxyUser, HiveConf conf, HadoopShims shims) {
+    boolean copied = false;
+    try {
+      if (proxyUser == null) {
+        copied = shims.runDistCpWithSnapshots(snap1, snap2, srcPaths, dst, conf);

Review comment:
       rename to snapshot new and old

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -162,7 +194,77 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+  }
+
+  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
+      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    if (!isSnapshotEnabled) {
+      LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
+      return FALLBACK_COPY;
+    }
+    DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
+    try {
+      if(isBootstrap) {
+        // Delete any pre existing snapshots.
+        SnapshotUtils.deleteSnapshotSafe(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix));

Review comment:
       if deletion fails what happens?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
##########
@@ -24,7 +24,7 @@
 
 class AlterDatabaseHandler extends AbstractEventHandler<AlterDatabaseMessage> {
 
-  AlterDatabaseHandler(NotificationEvent event) {
+  AlterDatabaseHandler(NotificationEvent event)  {

Review comment:
       remove the space

##########
File path: shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
##########
@@ -1197,6 +1224,103 @@ public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) thro
     }
   }
 
+  public boolean runDistCpWithSnapshots(String snap1, String snap2, List<Path> srcPaths, Path dst, Configuration conf)
+      throws IOException {
+    DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst).withSyncFolder(true).withUseDiff(snap1, snap2)
+        .preserve(FileAttribute.BLOCKSIZE).preserve(FileAttribute.XATTR).build();
+
+    List<String> params = constructDistCpWithSnapshotParams(srcPaths, dst, snap1, snap2, conf, "-diff");
+    try {
+      conf.setBoolean("mapred.mapper.new-api", true);
+      DistCp distcp = new DistCp(conf, options);
+      int returnCode = distcp.run(params.toArray(new String[0]));
+      if (returnCode == 0) {
+        return true;
+      } else if (returnCode == DistCpConstants.INVALID_ARGUMENT) {
+        // Handling FileNotFoundException, if source got deleted, in that case we don't want to copy either, So it is
+        // like a success case, we didn't had anything to copy and we copied nothing, so, we need not to fail.
+        LOG.warn("Copy failed with INVALID_ARGUMENT for source: {} to target: {} snapshot1: {} snapshot2: {} "
+            + "params: {}", srcPaths, dst, snap1, snap2, params);
+        return true;
+      } else if (returnCode == DistCpConstants.UNKNOWN_ERROR && conf
+          .getBoolean("hive.repl.externaltable.snapshot.overwrite.target", true)) {
+        // Check if this error is due to target modified.
+        if (shouldRdiff(dst, conf, snap1)) {
+          LOG.warn("Copy failed due to target modified. Attempting to restore back the target. source: {} target: {} "
+              + "snapshot: {}", srcPaths, dst, snap1);
+          List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, dst, ".", snap1, conf, "-rdiff");
+          DistCp rDistcp = new DistCp(conf, options);
+          returnCode = rDistcp.run(rParams.toArray(new String[0]));
+          if (returnCode == 0) {
+            LOG.info("Target restored to previous state.  source: {} target: {} snapshot: {}. Reattempting to copy.",
+                srcPaths, dst, snap1);
+            dst.getFileSystem(conf).deleteSnapshot(dst, snap1);
+            dst.getFileSystem(conf).createSnapshot(dst, snap1);
+            returnCode = distcp.run(params.toArray(new String[0]));
+            if (returnCode == 0) {
+              return true;
+            } else {
+              LOG.error("Copy failed with after target restore for source: {} to target: {} snapshot1: {} snapshot2: "
+                  + "{} params: {}. Return code: {}", srcPaths, dst, snap1, snap2, params, returnCode);
+              return false;
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Cannot execute DistCp process: " + e, e);

Review comment:
       add {} and just pass e. No need to append e as well

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
##########
@@ -22,11 +22,14 @@
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Set;
 
 abstract class AbstractMessageHandler implements MessageHandler {
+  static final Logger LOG = LoggerFactory.getLogger(AbstractMessageHandler.class);

Review comment:
       unused?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -218,4 +219,62 @@ public String getName() {
   public boolean canExecuteInParallel() {
     return true;
   }
+
+  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser) throws IOException {
+
+    DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, conf);
+    boolean result = false;
+    if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+      LOG.info("Using snapshot diff copy for source: {} and target: {}", sourcePath, targetPath);
+       result = FileUtils
+          .distCpWithSnapshot(firstSnapshot(work.getSnapshotPrefix()), secondSnapshot(work.getSnapshotPrefix()),
+              Collections.singletonList(sourcePath), targetPath, proxyUser,
+              conf, ShimLoader.getHadoopShims());
+       if(result) {
+         // Delete the older snapshot from last iteration.
+         targetFs.deleteSnapshot(targetPath, firstSnapshot(work.getSnapshotPrefix()));
+       } else {
+         throw new IOException(
+             "Can not successfully copy external table data using snapshot diff. source:" + sourcePath + " and target: "
+                 + targetPath);
+       }
+    } else if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.INITIAL_COPY)) {
+      LOG.info("Using snapshot initial copy for source: {} and target: {}", sourcePath, targetPath);
+      // Get the path relative to the initial snapshot for copy.
+      Path snapRelPath =
+          new Path(sourcePath, HdfsConstants.DOT_SNAPSHOT_DIR + "/" + secondSnapshot(work.getSnapshotPrefix()));
+
+      // This is the first time we are copying, check if the target is snapshottable or not, if not attempt to allow
+      // snapshots.
+      SnapshotUtils.allowSnapshot(targetFs, targetPath, conf);
+      // Attempt to delete the snapshot, in case this is a bootstrap post a failed incremental, Since in case of
+      // bootstrap we go from start, so delete any pre-existing snapshot.
+      SnapshotUtils.deleteSnapshotSafe(targetFs, targetPath, firstSnapshot(work.getSnapshotPrefix()));
+
+      // Copy from the initial snapshot path.
+      result = runFallbackDistCp(snapRelPath, targetPath, proxyUser);
+    }
+
+    // Create a new snapshot at target Filesystem. For the next iteration.
+    if (result) {
+      SnapshotUtils.createSnapshot(targetFs, targetPath, firstSnapshot(work.getSnapshotPrefix()), conf);
+    }
+    return result;
+  }
+
+  private boolean runFallbackDistCp(Path sourcePath, Path targetPath, UserGroupInformation proxyUser)
+      throws IOException {
+     // do we create a new conf and only here provide this additional option so that we get away from

Review comment:
       remove this comment and incorporate the change

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -218,4 +219,62 @@ public String getName() {
   public boolean canExecuteInParallel() {
     return true;
   }
+
+  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser) throws IOException {
+
+    DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, conf);
+    boolean result = false;
+    if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+      LOG.info("Using snapshot diff copy for source: {} and target: {}", sourcePath, targetPath);
+       result = FileUtils
+          .distCpWithSnapshot(firstSnapshot(work.getSnapshotPrefix()), secondSnapshot(work.getSnapshotPrefix()),
+              Collections.singletonList(sourcePath), targetPath, proxyUser,
+              conf, ShimLoader.getHadoopShims());
+       if(result) {
+         // Delete the older snapshot from last iteration.
+         targetFs.deleteSnapshot(targetPath, firstSnapshot(work.getSnapshotPrefix()));
+       } else {
+         throw new IOException(
+             "Can not successfully copy external table data using snapshot diff. source:" + sourcePath + " and target: "
+                 + targetPath);
+       }
+    } else if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.INITIAL_COPY)) {
+      LOG.info("Using snapshot initial copy for source: {} and target: {}", sourcePath, targetPath);
+      // Get the path relative to the initial snapshot for copy.
+      Path snapRelPath =
+          new Path(sourcePath, HdfsConstants.DOT_SNAPSHOT_DIR + "/" + secondSnapshot(work.getSnapshotPrefix()));
+
+      // This is the first time we are copying, check if the target is snapshottable or not, if not attempt to allow
+      // snapshots.
+      SnapshotUtils.allowSnapshot(targetFs, targetPath, conf);
+      // Attempt to delete the snapshot, in case this is a bootstrap post a failed incremental, Since in case of
+      // bootstrap we go from start, so delete any pre-existing snapshot.
+      SnapshotUtils.deleteSnapshotSafe(targetFs, targetPath, firstSnapshot(work.getSnapshotPrefix()));

Review comment:
       if deletion fails what happens?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1012,14 +1070,17 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           }
         }
         replLogger.endLog(bootDumpBeginReplId.toString());
-        work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
+        work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId, replSnapshotCount);
       }
       work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator());
       setDataCopyIterators(extTableFileList, managedTblList);
       LOG.info("Preparing to return {},{}->{}",
         dumpRoot.toUri(), bootDumpBeginReplId, currentNotificationId(hiveDb));
       return bootDumpBeginReplId;
     } finally {
+      if (snapPathFileList != null) {

Review comment:
       can be part of the try with resource block

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
##########
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;

Review comment:
       unused imports?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_EXTERNAL_TABLE_PATHS;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.createTableFileList;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.externalTableDataPath;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.getExternalTableBaseDir;
+
+/**
+ * Utility class for snapshot related operations.
+ */
+public class SnapshotUtils {
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(SnapshotUtils.class);
+
+  public static final String OLD_SNAPSHOT = "replOld";
+  public static final String NEW_SNAPSHOT = "replNew";
+
+  /**
+   * Gets a DistributedFileSystem object if possible from a path.
+   * @param path path from which DistributedFileSystem needs to be extracted.
+   * @param conf Hive Configuration.
+   * @return DFS or null.
+   */
+  public static DistributedFileSystem getDFS(Path path, HiveConf conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (fs instanceof DistributedFileSystem) {
+      return (DistributedFileSystem) fs;
+    } else {
+      LOG.error("FileSystem for {} is not DistributedFileSystem", path);
+      throw new IOException("The filesystem for path {} is {}, The filesystem should be DistributedFileSystem to "
+          + "support snapshot based copy.");
+    }
+  }
+
+  /**
+   *  Checks whether a given snapshot exists or not.
+   * @param dfs DistributedFileSystem.
+   * @param path path of snapshot.
+   * @param snapshotPrefix snapshot name prefix.
+   * @param snapshotName name of snapshot.
+   * @param conf Hive configuration.
+   * @return true if the snapshot exists.
+   * @throws IOException in case of any error.
+   */
+  public static boolean isSnapshotAvailable(DistributedFileSystem dfs, Path path, String snapshotPrefix,
+      String snapshotName, HiveConf conf) throws IOException {
+    AtomicBoolean isSnapAvlb = new AtomicBoolean(false);
+    Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class)
+        .withFailOnException(SnapshotException.class).build();
+    try {
+      retryable.executeCallable(() -> {
+        isSnapAvlb
+            .set(dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR + "/" + snapshotPrefix + snapshotName)));
+        LOG.debug("Snapshot for path {} is {}", path, isSnapAvlb.get() ? "available" : "unavailable");
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SnapshotException("Failed to check if snapshot is available on " + path, e);
+    }
+    return isSnapAvlb.get();
+  }
+
+  /**
+   * Attempts deleting a snapshot, if exists.
+   * @param dfs DistributedFileSystem
+   * @param snapshotPath path of snapshot
+   * @param snapshotName name of the snapshot
+   * @return true if the snapshot gets deleted.
+   */
+  public static boolean deleteSnapshotSafe(DistributedFileSystem dfs, Path snapshotPath, String snapshotName)
+      throws IOException {
+    try {
+      dfs.deleteSnapshot(snapshotPath, snapshotName);
+      return true;
+    } catch (SnapshotException e) {
+      LOG.debug("Couldn't delete the snapshot {} under path {}", snapshotName, snapshotPath, e);
+    } catch (FileNotFoundException fnf) {
+      LOG.warn("Couldn't delete the snapshot {} under path {}", snapshotName, snapshotPath, fnf);
+    }
+    return false;
+  }
+
+  /**
+   *  Attempts to disallow snapshot on a path, ignoring exceptions.
+   * @param dfs DistributedFileSystem
+   * @param snapshotPath path of snapshot
+   */
+  public static void disallowSnapshot(DistributedFileSystem dfs, Path snapshotPath) {
+    try {
+      // Check if the directory is snapshottable.
+      if (dfs.getFileStatus(snapshotPath).isSnapshotEnabled()) {
+        dfs.disallowSnapshot(snapshotPath);
+      }
+    } catch (Exception e) {
+      LOG.warn("Could not disallow snapshot for path {}", snapshotPath, e);
+    }
+  }
+
+  /**
+   * Attempts to allow snapshots on the path, with retry.
+   * @param dfs DistributedFileSystem.
+   * @param snapshotPath path of snapshot.
+   * @param conf Hive Configuration.
+   */
+  public static void allowSnapshot(DistributedFileSystem dfs, Path snapshotPath, HiveConf conf) throws IOException {
+    // Check if the directory is already snapshottable.
+    Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class)
+        .withFailOnException(SnapshotException.class).build();
+    try {
+      retryable.executeCallable(() -> {
+        try {
+          if (!dfs.getFileStatus(snapshotPath).isSnapshotEnabled()) {
+            dfs.allowSnapshot(snapshotPath);
+          }
+        } catch (FileNotFoundException fnf) {
+          // Source got deleted, we can ignore.
+          LOG.info("Failed to allow snapshot for {} since the path got deleted", snapshotPath);
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SnapshotException("Failed to AllowSnapshot on " + snapshotPath, e);
+    }
+  }
+
+  /**
+   * Specifies the mode of copy when using Snapshots for replication.
+   */
+  public enum SnapshotCopyMode {
+    INITIAL_COPY, // Copying for the first time, only one snapshot exists.
+    DIFF_COPY, // Copying when a pair of snapshots are present
+    FALLBACK_COPY // Symbolizes that normal copy needs to be used, That is without use of snapshots.
+  }
+
+  /**
+   *  Attempts creating a snapshot, with retries.
+   * @param fs FileSystem object.
+   * @param snapshotPath Path of the snapshot.
+   * @param snapshotName Name of the snapshot.
+   * @param conf Hive Configuration.
+   */
+  public static void createSnapshot(FileSystem fs, Path snapshotPath, String snapshotName, HiveConf conf)
+      throws IOException {
+    Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class)
+        .withFailOnException(SnapshotException.class).build();
+    try {
+      retryable.executeCallable(() -> {
+        try {
+          fs.createSnapshot(snapshotPath, snapshotName);
+        } catch (FileNotFoundException e) {
+          LOG.warn("Couldn't create the snapshot {} under path {}", snapshotName, snapshotPath, e);
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SnapshotException(
+          "Unable to create snapshot for path: " + snapshotPath + " snapshot name: " + snapshotName, e);
+    }
+  }
+
+  /**
+   * Renames a snapshot with retries.
+   * @param fs the filesystem.
+   * @param snapshotPath the path where the snapshot lies.
+   * @param sourceSnapshotName current snapshot name.
+   * @param targetSnapshotName new snapshot name.
+   * @param conf configuration.
+   * @throws IOException in case of failure.
+   */
+  public static void renameSnapshot(FileSystem fs, Path snapshotPath, String sourceSnapshotName,
+      String targetSnapshotName, HiveConf conf) throws IOException {
+    Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class)
+        .withFailOnException(SnapshotException.class).build();
+    try {
+      retryable.executeCallable(() -> {
+        try {
+          fs.renameSnapshot(snapshotPath, sourceSnapshotName, targetSnapshotName);
+        } catch (FileNotFoundException e) {
+          LOG.warn("Couldn't rename the snapshot {} to {} under path {}", sourceSnapshotName, targetSnapshotName,
+              snapshotPath, e);
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SnapshotException(
+          "Unable to rename snapshot " + sourceSnapshotName + " to " + targetSnapshotName + " for path: "
+              + snapshotPath, e);
+    }
+  }
+
+  /**
+   * Extracts the entries from FileList into an ArrayList
+   * @param fl the FileList
+   * @return the ArrayList containing the entries.
+   */
+  public static ArrayList<String> getListFromFileList(FileList fl) {
+    ArrayList<String> result = new ArrayList<>();
+    while (fl.hasNext()) {
+      result.add(fl.next());
+    }
+    return result;
+  }
+
+  /**
+   *  Deletes the snapshots present in the list.
+   * @param dfs DistributedFileSystem.
+   * @param diffList Elements to be deleted.
+   * @param prefix Prefix used in snapshot names,
+   * @param snapshotCount snapshot counter to track the number of snapshots deleted.
+   * @throws IOException in case of any error.
+   */
+  private static void cleanUpSnapshots(DistributedFileSystem dfs, ArrayList<String> diffList, String prefix,
+      ReplSnapshotCount snapshotCount) throws IOException {
+    for (String path : diffList) {
+      Path snapshotPath = new Path(path);
+      boolean isFirstDeleted = deleteSnapshotSafe(dfs, snapshotPath, firstSnapshot(prefix));
+      boolean isSecondDeleted = deleteSnapshotSafe(dfs, snapshotPath, secondSnapshot(prefix));
+      disallowSnapshot(dfs, snapshotPath);
+      if (snapshotCount != null) {
+        if (isFirstDeleted) {
+          snapshotCount.incrementNumDeleted();
+        }
+        if (isSecondDeleted) {
+          snapshotCount.incrementNumDeleted();
+        }
+      }
+    }
+  }
+
+  private static ArrayList<String> getDiffList(ArrayList<String> newList, ArrayList<String> oldList, HiveConf conf,
+      boolean isLoad) throws SemanticException {
+    ArrayList<String> diffList = new ArrayList<>();
+    for (String oldPath : oldList) {
+      if (!newList.contains(oldPath)) {
+        if (isLoad) {
+          diffList.add(externalTableDataPath(conf, getExternalTableBaseDir(conf), new Path(oldPath)).toString());
+        } else {
+          diffList.add(oldPath);
+        }
+        diffList.add(oldPath);
+      }
+    }
+    return diffList;
+  }
+
+  /**
+   * Cleans up any snapshots by computing diff between the list of snapshots between two replication dumps.
+   * @param dumpRoot Root of the dump
+   * @param snapshotPrefix prefix used by the snapshot
+   * @param conf Hive Configuration.
+   * @param snapshotCount the counter to store number of deleted snapshots.
+   * @param isLoad If this is called for clearing up snapshots at target cluster, In case of load it renames the
+   *               snapshot file as well.
+   * @throws IOException
+   * @throws SemanticException
+   */
+  public static void cleanupSnapshots(Path dumpRoot, String snapshotPrefix, HiveConf conf,
+      ReplSnapshotCount snapshotCount, boolean isLoad) throws IOException, SemanticException {
+    DistributedFileSystem dfs = (DistributedFileSystem) dumpRoot.getFileSystem(conf);
+    if (dfs.exists(new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) {
+      FileList snapOld = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, conf);
+      FileList snapNew = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf);
+      ArrayList<String> oldPaths = SnapshotUtils.getListFromFileList(snapOld);
+      ArrayList<String> newPaths = SnapshotUtils.getListFromFileList(snapNew);
+      ArrayList<String> diffList = SnapshotUtils.getDiffList(newPaths, oldPaths, conf, isLoad);
+      dfs = isLoad ? (DistributedFileSystem) getExternalTableBaseDir(conf).getFileSystem(conf) : dfs;
+      SnapshotUtils.cleanUpSnapshots(dfs, diffList, snapshotPrefix, snapshotCount);
+    }
+    if (isLoad) {
+      try {
+        dfs.delete((new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true);
+      } catch (FileNotFoundException fnf) {
+        // ignore
+        LOG.warn("Failed to clean up snapshot " + EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, fnf);
+      }
+      try {
+        dfs.rename(new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
+            new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD), Options.Rename.OVERWRITE);
+      } catch (FileNotFoundException fnf) {
+        // ignore
+        LOG.warn("Failed to clean up snapshot " + EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, fnf);
+      }
+    }
+  }
+
+  public static boolean shouldRdiff(Path p, HiveConf conf, String snapshot) {
+    try {
+      DistributedFileSystem dfs = getDFS(p, conf);
+      // check if the target got modified
+      int index =
+          dfs.getClient().getSnapshotDiffReportListing(p.toUri().getPath(), snapshot, "", DFSUtilClient.EMPTY_BYTES, -1)
+              .getLastIndex();
+
+      return index > -1;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static void deleteReplRelatedSnapshots(FileSystem fs, Path path) {
+    try {
+      FileStatus[] listing = fs.listStatus(new Path(path, ".snapshot"));
+      for (FileStatus elem : listing) {
+        if (elem.getPath().getName().contains(OLD_SNAPSHOT) || elem.getPath().getName().contains(NEW_SNAPSHOT)) {
+          deleteSnapshotSafe((DistributedFileSystem) fs, path, elem.getPath().getName());

Review comment:
       can we leak snapshots. we dont compare if delete was successful or not

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -668,6 +668,18 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
         + " table or partition level. If hive.exec.parallel \n"
         + "is set to true then max worker threads created for copy can be hive.exec.parallel.thread.number(determines \n"
         + "number of copy tasks in parallel) * hive.repl.parallel.copy.tasks "),
+    REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY("hive.repl.externaltable.snapshotdiff.copy",
+        false,"Use snapshot diff for copying data from source to "
+        + "destination cluster for external table in distcp"),
+    REPL_SNAPSHOT_OVERWRITE_TARGET_FOR_EXTERNAL_TABLE_COPY("hive.repl.externaltable.snapshot.overwrite.target",
+        true,"If this is enabled, in case the target is modified, when using snapshot for external table"
+        + "data copy, the target data is overwritten and the modifications are removed and the copy is again "
+        + "attempted using the snapshot based approach. If disabled, the replication will fail in case the target is "
+        + "modified."),
+    REPL_SNAPSHOT_EXTERNAL_TABLE_PATHS("hive.repl.externatable.snapshot.paths",

Review comment:
       do we need this config as well or the single copy one is sufficient?

##########
File path: common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
##########
@@ -626,7 +626,8 @@
   REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non recoverable error. Needs manual intervention"),
   REPL_INVALID_ARGUMENTS(40012, "Invalid arguments error : {0}.", true),
   REPL_INVALID_ALTER_TABLE(40013, "{0}Unable to alter table{1}", true),
-  REPL_PERMISSION_DENIED(40014, "{0}org.apache.hadoop.security.AccessControlException{1}", true)
+  REPL_PERMISSION_DENIED(40014, "{0}org.apache.hadoop.security.AccessControlException{1}", true),
+  REPL_SNAPSHOT_EXCEPTION(40015, "SNAPSHOT_ERROR", true)

Review comment:
       rename to distcp snapshot error




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 590998)
    Time Spent: 3h 20m  (was: 3h 10m)

> Add support for Snapshots during external table replication
> -----------------------------------------------------------
>
>                 Key: HIVE-24852
>                 URL: https://issues.apache.org/jira/browse/HIVE-24852
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Ayush Saxena
>            Assignee: Ayush Saxena
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: Design Doc HDFS Snapshots for External Table Replication-01.pdf
>
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add support for use of snapshot diff for external table replication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)