You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/02/08 06:19:38 UTC

[GitHub] [hive] ArkoSharma opened a new pull request #2539: HIVE-25397: snapshot support for controlled failover

ArkoSharma opened a new pull request #2539:
URL: https://github.com/apache/hive/pull/2539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759026652



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,15 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in case of bootstrap.
             FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),

Review comment:
       made the change.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -254,23 +256,55 @@ public boolean canExecuteInParallel() {
     return true;
   }
 
-  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser,
+  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser, boolean isBootstrap,
       HiveConf clonedConf) throws IOException {
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    String snapPrefix = work.getSnapshotPrefix();
+    if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+      // in case of bootstrap replication from B to A (reverse replication), rename snapshots in A
+      // as they might have been renamed during dump in B
+      FileStatus[] listing = targetFs.listStatus(new Path(targetPath, ".snapshot"));
+      for (FileStatus elem : listing) {
+        String snapShotName = elem.getPath().getName();
+        String prefix;
+        if (snapShotName.contains(OLD_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, firstSnapshot(prefix), firstSnapshot(snapPrefix));
+          }
+        }
+        if (snapShotName.contains(NEW_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, secondSnapshot(prefix), secondSnapshot(snapPrefix));
+          }
+        }
+      }

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r722352700



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {
         // Delete any pre existing snapshots.
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+        ret.put(prefix, INITIAL_COPY);
+        return ret;
       }
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {

Review comment:
       Added.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -243,18 +244,25 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    boolean isSecondSnapshotAvl =

Review comment:
       Renamed.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -243,18 +244,25 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    boolean isSecondSnapshotAvl =
+            SnapshotUtils.isSnapshotAvailable(targetFs, targetPath, work.getSnapshotPrefix(), NEW_SNAPSHOT, clonedConf);
     if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+      if(isSecondSnapshotAvl) {

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ayushtkn commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r724067080



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in case of bootstrap.
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),

Review comment:
       Check if we need to preserve

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -201,7 +218,10 @@ public void testBasicReplicationWithSnapshots() throws Throwable {
   public void testBasicStartFromIncrementalReplication() throws Throwable {
 
     // Run a cycle of dump & load with snapshot disabled.
-    ArrayList<String> withClause = new ArrayList<>(1);
+    ArrayList<String> withClause = new ArrayList<>(3);
+    ArrayList<String> withClause2 = new ArrayList<>(3);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    withClause2.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");

Review comment:
       Give a check why twice?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,138 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
+        // In case of reverse replication after failover, in some paths, second snapshot may not be present.
+        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumDeleted();
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();

Review comment:
       revert this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
##########
@@ -275,17 +275,17 @@ public static void renameSnapshot(FileSystem fs, Path snapshotPath, String sourc
 
   /**
    *  Deletes the snapshots present in the list.
-   * @param dfs DistributedFileSystem.
    * @param diffList Elements to be deleted.
    * @param prefix Prefix used in snapshot names,
    * @param snapshotCount snapshot counter to track the number of snapshots deleted.
    * @param conf the Hive Configuration.
    * @throws IOException in case of any error.
    */
-  private static void cleanUpSnapshots(DistributedFileSystem dfs, ArrayList<String> diffList, String prefix,
+  private static void cleanUpSnapshots(ArrayList<String> diffList, String prefix,
       ReplSnapshotCount snapshotCount, HiveConf conf) throws IOException {
     for (String path : diffList) {
       Path snapshotPath = new Path(path);
+      DistributedFileSystem dfs = (DistributedFileSystem) snapshotPath.getFileSystem(conf);

Review comment:
       It is creating DFS in loop, can make it create only once

##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -705,6 +705,16 @@ public static boolean distCpWithSnapshot(String oldSnapshot, String newSnapshot,
             oldSnapshot, newSnapshot);
     } catch (IOException e) {
       LOG.error("Can not copy using snapshot from source: {}, target: {}", srcPaths, dst);
+      try {
+        // in case overwriteTarget is set to false, and we encounter an exception due to targetFs getting
+        // changed since last snapshot, then fallback to initial copy
+        if (!overwriteTarget && !e.getCause().getMessage().contains("changed since snapshot")) {
+          LOG.warn("Diff copy failed due to changed target filesystem, falling back to normal distcp.");
+          return distCp(srcPaths.get(0).getFileSystem(conf), srcPaths, dst, false, proxyUser, conf, shims);

Review comment:
       use the snapshot to copy, you can keep it where rdiff is being done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma closed pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma closed pull request #2539:
URL: https://github.com/apache/hive/pull/2539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r722353424



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -270,8 +278,9 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
       // snapshots.
       SnapshotUtils.allowSnapshot(targetFs, work.getFullyQualifiedTargetPath(), clonedConf);
       // Attempt to delete the snapshot, in case this is a bootstrap post a failed incremental, Since in case of
-      // bootstrap we go from start, so delete any pre-existing snapshot.
+      // bootstrap we go from start, so delete any pre-existing snapshot, (both snapshots can exist in case of failback)

Review comment:
       Changed to read 'reverse replication'




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r726757597



##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -705,6 +705,16 @@ public static boolean distCpWithSnapshot(String oldSnapshot, String newSnapshot,
             oldSnapshot, newSnapshot);
     } catch (IOException e) {
       LOG.error("Can not copy using snapshot from source: {}, target: {}", srcPaths, dst);
+      try {
+        // in case overwriteTarget is set to false, and we encounter an exception due to targetFs getting
+        // changed since last snapshot, then fallback to initial copy
+        if (!overwriteTarget && !e.getCause().getMessage().contains("changed since snapshot")) {
+          LOG.warn("Diff copy failed due to changed target filesystem, falling back to normal distcp.");
+          return distCp(srcPaths.get(0).getFileSystem(conf), srcPaths, dst, false, proxyUser, conf, shims);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r690179615



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -243,18 +244,25 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    boolean isSecondSnapshotAvl =
+            SnapshotUtils.isSnapshotAvailable(targetFs, targetPath, work.getSnapshotPrefix(), NEW_SNAPSHOT, clonedConf);
     if (getWork().getCopyMode().equals(SnapshotUtils.SnapshotCopyMode.DIFF_COPY)) {
+      if(isSecondSnapshotAvl) {

Review comment:
       add comments why this is done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759026088



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));

Review comment:
       done.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ayushtkn commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r724067080



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in case of bootstrap.
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),

Review comment:
       Check if we need to preserve

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -201,7 +218,10 @@ public void testBasicReplicationWithSnapshots() throws Throwable {
   public void testBasicStartFromIncrementalReplication() throws Throwable {
 
     // Run a cycle of dump & load with snapshot disabled.
-    ArrayList<String> withClause = new ArrayList<>(1);
+    ArrayList<String> withClause = new ArrayList<>(3);
+    ArrayList<String> withClause2 = new ArrayList<>(3);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    withClause2.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");

Review comment:
       Give a check why twice?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,138 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
+        // In case of reverse replication after failover, in some paths, second snapshot may not be present.
+        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumDeleted();
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();

Review comment:
       revert this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
##########
@@ -275,17 +275,17 @@ public static void renameSnapshot(FileSystem fs, Path snapshotPath, String sourc
 
   /**
    *  Deletes the snapshots present in the list.
-   * @param dfs DistributedFileSystem.
    * @param diffList Elements to be deleted.
    * @param prefix Prefix used in snapshot names,
    * @param snapshotCount snapshot counter to track the number of snapshots deleted.
    * @param conf the Hive Configuration.
    * @throws IOException in case of any error.
    */
-  private static void cleanUpSnapshots(DistributedFileSystem dfs, ArrayList<String> diffList, String prefix,
+  private static void cleanUpSnapshots(ArrayList<String> diffList, String prefix,
       ReplSnapshotCount snapshotCount, HiveConf conf) throws IOException {
     for (String path : diffList) {
       Path snapshotPath = new Path(path);
+      DistributedFileSystem dfs = (DistributedFileSystem) snapshotPath.getFileSystem(conf);

Review comment:
       It is creating DFS in loop, can make it create only once

##########
File path: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
##########
@@ -705,6 +705,16 @@ public static boolean distCpWithSnapshot(String oldSnapshot, String newSnapshot,
             oldSnapshot, newSnapshot);
     } catch (IOException e) {
       LOG.error("Can not copy using snapshot from source: {}, target: {}", srcPaths, dst);
+      try {
+        // in case overwriteTarget is set to false, and we encounter an exception due to targetFs getting
+        // changed since last snapshot, then fallback to initial copy
+        if (!overwriteTarget && !e.getCause().getMessage().contains("changed since snapshot")) {
+          LOG.warn("Diff copy failed due to changed target filesystem, falling back to normal distcp.");
+          return distCp(srcPaths.get(0).getFileSystem(conf), srcPaths, dst, false, proxyUser, conf, shims);

Review comment:
       use the snapshot to copy, you can keep it where rdiff is being done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r722350281



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {

Review comment:
       Considered making this change, but realised this would then require a similar listing and name-checking for snapshot on the target side. This can occur in case of reverse replication after failover with different names of src and tgt dbs.
   Hence decided to proceed with this implementation itself as it allows for identifying which snapshots are being re-used. Also makes sense to include this work with dump considering that in general, dump should take lesser time than load (except possibly cases with just external tables in db with data-copy on src). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759005160



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {

Review comment:
       This case is included in the following block.
   
   (!secondSnapAvailable && firstSnapAvailable) - this condition denotes reverse replication taking place for the first time for a particular path. So the execution is similar for both incremental and bootstrap - whereby we either reuse the snaps or don't depending upon the conf. For this purpose, this case is clubbed for both.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ayushtkn commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r689459394



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {

Review comment:
       Is this all to find out. the prefix, which is always the database name?  This seems to be done for even incremental also now?
   Can we just not rename the snapshot in case of control failover as per the new standards and use the normal flow post that and get rid of maintaining prefix

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {
         // Delete any pre existing snapshots.
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+        ret.put(prefix, INITIAL_COPY);
+        return ret;
       }
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {

Review comment:
       is prevSnaps initialised for bootstrap? better to add a test as well for the failure and checkpointing flow

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -79,11 +80,11 @@ public static void classLevelSetup() throws Exception {
     overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
         UserGroupInformation.getCurrentUser().getUserName());
-    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
     overrides.put(HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname, "true");
     overrides.put(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname, "true");
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationScenarios.class);

Review comment:
       why do we need this? all the test should work as is without bothering existing tests. 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -103,38 +109,49 @@ static void internalBeforeClassSetup(Map<String, String> overrides,
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("metastore.warehouse.tenant.colocation", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String primaryBaseDir = Files.createTempDirectory("primary").toFile().getAbsolutePath();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir);
+    replicaConf = new HiveConf(clazz);
+    replicaConf.set("dfs.client.use.datanode.hostname", "true");
+    replicaConf.set("metastore.warehouse.tenant.colocation", "true");
+    replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath();
+    replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
     MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+            new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+    MiniDFSCluster miniDFSClusterReplica =
+            new MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
     Map<String, String> acidEnableConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
-        put("metastore.warehouse.tenant.colocation", "true");
-        put("hive.in.repl.test", "true");
-        put("hive.txn.readonly.enabled", "true");
-        //HIVE-25267
-        put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
-        put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
-        put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname, "false");
-      }};
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("metastore.warehouse.tenant.colocation", "true");
+      put("hive.in.repl.test", "true");
+      put("hive.txn.readonly.enabled", "true");

Review comment:
       nit
   looks like only indentation change, avoid unrelated changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r727865312



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -103,38 +109,49 @@ static void internalBeforeClassSetup(Map<String, String> overrides,
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("metastore.warehouse.tenant.colocation", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String primaryBaseDir = Files.createTempDirectory("primary").toFile().getAbsolutePath();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir);
+    replicaConf = new HiveConf(clazz);
+    replicaConf.set("dfs.client.use.datanode.hostname", "true");
+    replicaConf.set("metastore.warehouse.tenant.colocation", "true");
+    replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath();
+    replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
     MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+            new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+    MiniDFSCluster miniDFSClusterReplica =
+            new MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
     Map<String, String> acidEnableConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
-        put("metastore.warehouse.tenant.colocation", "true");
-        put("hive.in.repl.test", "true");
-        put("hive.txn.readonly.enabled", "true");
-        //HIVE-25267
-        put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
-        put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
-        put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname, "false");
-      }};
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("metastore.warehouse.tenant.colocation", "true");
+      put("hive.in.repl.test", "true");
+      put("hive.txn.readonly.enabled", "true");

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759005160



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {

Review comment:
       This case is included in the following block (included in comment).
   
   (!secondSnapAvailable && firstSnapAvailable) - this condition denotes reverse replication taking place for the first time for a particular path. So the execution is similar for both incremental and bootstrap - whereby we either reuse the snaps or don't depending upon the conf. For this purpose, this case is clubbed for both.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r690179449



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -243,18 +244,25 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    boolean isSecondSnapshotAvl =

Review comment:
       rename to more readable variables




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ayushtkn commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r680700839



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,54 +192,130 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
+  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
       boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
       ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
+    DistributedFileSystem targetDfs = SnapshotUtils.getDFS(targetPath, conf);

Review comment:
       we can not use the targetFS, during the dump, it won't be accessible in some cases. We should use targetFS only during copy phase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ayushtkn commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r734993099



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -593,6 +642,128 @@ public void testFailureScenarios() throws Throwable {
         .verifyResults(new String[] {"delhi", "noida"});
   }
 
+  /*
+   * test to check reuse of diff snapshots when incremental fails with irrecoverable error during data-copy (target modified)
+   * and re-bootstrap is required but overwrite is off.
+   */
+  @Test
+  public void testRebootstrapDiffCopy() throws Throwable {
+
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    DistributedFileSystem fsTarget = replica.miniDFSCluster.getFileSystem();
+    Path externalTableLocation1 = new Path("/" + testName.getMethodName() + "/table1/");
+    fs.mkdirs(externalTableLocation1, new FsPermission("777"));
+
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + externalTableLocation1
+            .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + "'");
+
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create external table table1 (place string) partitioned by (country string) row format "
+                    + "delimited fields terminated by ',' location '" + externalTableLocation1.toString() + "'")
+            .run("create external table table2 (id int)")
+            .run("create external table table3 (id int)")
+            .run("insert into table1 partition(country='nepal') values ('kathmandu')")
+            .run("insert into table1 partition(country='china') values ('beejing')")
+            .run("insert into table2 values(1)")
+            .run("insert into table3 values(5)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 'table1'")
+            .verifyResults(new String[] {"table1"})
+            .run("select place from table1 where country='nepal'")
+            .verifyResults(new String[] {"kathmandu"})
+            .run("select place from table1 where country='china'")
+            .verifyResults(new String[] {"beejing"})
+            .run("select id from table3")
+            .verifyResults(new String[]{"5"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Check if the table1 directory is snapshotoble and the snapshot is there.
+    validateInitialSnapshotsCreated(externalTableLocation1.toString());
+
+    // Add some more data and do a dump & load
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 partition(country='china') values ('wuhan')")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 values(6)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select place from table1 where country='china'")
+            .verifyResults(new String[] {"beejing", "wuhan"})
+            .run("select id from table3")
+            .verifyResults(new String[]{"5", "6"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1", "2"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Verify if diff snapshots is there.
+    validateDiffSnapshotsCreated(externalTableLocation1.toString());
+
+    Path targetWhPath = externalTableDataPath(replicaConf, REPLICA_EXTERNAL_BASE,
+            new Path(primary.getDatabase(primaryDbName).getLocationUri()));
+    DistributedFileSystem replicaDfs = (DistributedFileSystem) targetWhPath.getFileSystem(replicaConf);
+
+    // Emulate the situation of a rebootstrap with incomplete data copied in the previous incremental cycle for some paths
+    // a. add some data to some paths
+    // b. do a dump load with snapshot disabled
+    // c. Now, some paths should have outdated snapshots in both source and target.
+    //    Re-enable snapshot and check whether diff-copy takes place for a fresh bootstrap
+    withClause.add("'hive.repl.externaltable.snapshotdiff.copy'='false'");
+    tuple = primary.run("use " + primaryDbName)
+            .run("insert into table table3 values (7)")
+            .run("insert into table table2 values (3)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select id from table3")
+            .verifyResults(new String[]{"5", "6", "7"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1", "2", "3"});
+
+    replica.run("use " + replicatedDbName)
+            .run("drop table table1")
+            .run("drop table table2")
+            .run("drop table table3")
+            .run("drop database "+ replicatedDbName + " cascade");

Review comment:
       drop database cascade would have dropped tables as well right?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));

Review comment:
       Should use the retryable renameSnaspshot from SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -254,23 +256,55 @@ public boolean canExecuteInParallel() {
     return true;
   }
 
-  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser,
+  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInformation proxyUser, boolean isBootstrap,
       HiveConf clonedConf) throws IOException {
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, clonedConf);
     boolean result = false;
+    String snapPrefix = work.getSnapshotPrefix();
+    if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+      // in case of bootstrap replication from B to A (reverse replication), rename snapshots in A
+      // as they might have been renamed during dump in B
+      FileStatus[] listing = targetFs.listStatus(new Path(targetPath, ".snapshot"));
+      for (FileStatus elem : listing) {
+        String snapShotName = elem.getPath().getName();
+        String prefix;
+        if (snapShotName.contains(OLD_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, firstSnapshot(prefix), firstSnapshot(snapPrefix));
+          }
+        }
+        if (snapShotName.contains(NEW_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, secondSnapshot(prefix), secondSnapshot(snapPrefix));
+          }
+        }
+      }

Review comment:
       This seems dupe to one in ReplExternalTable, Can be refactored as a method in SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,

Review comment:
       ``targetPath`` is unused

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));

Review comment:
       Should use the retryable renameSnaspshot from SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,15 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in case of bootstrap.
             FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),

Review comment:
       In case reuse snapshot is not turned on, In that we case we should clean up right? Then we don't need the current file as well. since we will go always with Initial copy?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {
+          // for normal bootstrap, use initial_copy
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumCreated();
+          }
+          allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
+          return INITIAL_COPY;
+        }
+      }
+
+      /*
+      * We have 4 cases :
+      * i.   both old and new snapshots exist in src -
+      *        a. In case of bootstrap -   it must be a re-bootstrap case where incremental failed earlier. Handled above.
+      *        b. In case of incremental - denotes normal incremental flow, we delete old snapshot,
+      *                                    rename the new as old and create a new 'new-snapshot'. No changes required in target.
+      * ii.  only new snapshot exists in src -
+      *        a. In case of incremental - denotes a path where initial copy would have been done in the previous
+      *                                    iteration, we rename it as 'old' and create a new 'new-snapshot'.
+      *                                    No changes required in target.
+      *        b. in case of bootstrap, it must be a re-bootstrap case. Handled above.
+      * iii. only old snapshot exists in src - this can occur during B(src)->A(tgt) replication after failover.
+      *      If reuse snapshots conf is true, attempt to reuse the snapshots :
+      *           Assume both snapshots exist in tgt (A). Deleting the old one and renaming the new as old will
+      *           be done during load.
+      *      Else proceed with initial copy.
+      * iv.  none exist - new path added in conf, need to do initial copy.
+      * */
+
+      if (secondSnapAvailable) {
+        if(firstSnapAvailable) {
+          sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
+          replSnapshotCount.incrementNumDeleted();
+          SnapshotUtils.renameSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix), conf);
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          replSnapshotCount.incrementNumCreated();
+          snapPathFileList.add(sourcePath.toString());
+          return DIFF_COPY;
+        } else {
+          //only new snapshot exists
           sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));

Review comment:
       Can you replace with RenameSnapshot from SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {

Review comment:
       Shouldn't this else block be always executed in case of bootstarp if reuese is false, irrespective of the ``!(!secondSnapAvailable && firstSnapAvailable)`` guard?  

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;

Review comment:
       Can use `firstSnapAvailable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma closed pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma closed pull request #2539:
URL: https://github.com/apache/hive/pull/2539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r722352460



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -79,11 +80,11 @@ public static void classLevelSetup() throws Exception {
     overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
         UserGroupInformation.getCurrentUser().getUserName());
-    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
     overrides.put(HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname, "true");
     overrides.put(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname, "true");
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationScenarios.class);

Review comment:
       The tests are modified to use different filesystems for src and tgt. A few wrongFS related errors were fixed and this required testing with different fs for src and tgt.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759026376



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;

Review comment:
       done.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {
+          // for normal bootstrap, use initial_copy
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumCreated();
+          }
+          allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
+          return INITIAL_COPY;
+        }
+      }
+
+      /*
+      * We have 4 cases :
+      * i.   both old and new snapshots exist in src -
+      *        a. In case of bootstrap -   it must be a re-bootstrap case where incremental failed earlier. Handled above.
+      *        b. In case of incremental - denotes normal incremental flow, we delete old snapshot,
+      *                                    rename the new as old and create a new 'new-snapshot'. No changes required in target.
+      * ii.  only new snapshot exists in src -
+      *        a. In case of incremental - denotes a path where initial copy would have been done in the previous
+      *                                    iteration, we rename it as 'old' and create a new 'new-snapshot'.
+      *                                    No changes required in target.
+      *        b. in case of bootstrap, it must be a re-bootstrap case. Handled above.
+      * iii. only old snapshot exists in src - this can occur during B(src)->A(tgt) replication after failover.
+      *      If reuse snapshots conf is true, attempt to reuse the snapshots :
+      *           Assume both snapshots exist in tgt (A). Deleting the old one and renaming the new as old will
+      *           be done during load.
+      *      Else proceed with initial copy.
+      * iv.  none exist - new path added in conf, need to do initial copy.
+      * */
+
+      if (secondSnapAvailable) {
+        if(firstSnapAvailable) {
+          sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
+          replSnapshotCount.incrementNumDeleted();
+          SnapshotUtils.renameSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix), conf);
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
+          replSnapshotCount.incrementNumCreated();
+          snapPathFileList.add(sourcePath.toString());
+          return DIFF_COPY;
+        } else {
+          //only new snapshot exists
           sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix));

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r726758141



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {

Review comment:
       Made the change with the assumption that conf with singlePaths do not get modified for reverse-bootstrap (i.e. after reverse replication after failover)  - which gets rid of the need of doing the same during incremental.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r726757091



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -201,7 +218,10 @@ public void testBasicReplicationWithSnapshots() throws Throwable {
   public void testBasicStartFromIncrementalReplication() throws Throwable {
 
     // Run a cycle of dump & load with snapshot disabled.
-    ArrayList<String> withClause = new ArrayList<>(1);
+    ArrayList<String> withClause = new ArrayList<>(3);
+    ArrayList<String> withClause2 = new ArrayList<>(3);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    withClause2.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");

Review comment:
       changed to use one withClause list.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
##########
@@ -275,17 +275,17 @@ public static void renameSnapshot(FileSystem fs, Path snapshotPath, String sourc
 
   /**
    *  Deletes the snapshots present in the list.
-   * @param dfs DistributedFileSystem.
    * @param diffList Elements to be deleted.
    * @param prefix Prefix used in snapshot names,
    * @param snapshotCount snapshot counter to track the number of snapshots deleted.
    * @param conf the Hive Configuration.
    * @throws IOException in case of any error.
    */
-  private static void cleanUpSnapshots(DistributedFileSystem dfs, ArrayList<String> diffList, String prefix,
+  private static void cleanUpSnapshots(ArrayList<String> diffList, String prefix,
       ReplSnapshotCount snapshotCount, HiveConf conf) throws IOException {
     for (String path : diffList) {
       Path snapshotPath = new Path(path);
+      DistributedFileSystem dfs = (DistributedFileSystem) snapshotPath.getFileSystem(conf);

Review comment:
       Done.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,138 @@ private void dirLocationToCopy(String tableName, FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
+                                                                              ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, ".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
+        // In case of reverse replication after failover, in some paths, second snapshot may not be present.
+        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumDeleted();
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r727865312



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -103,38 +109,49 @@ static void internalBeforeClassSetup(Map<String, String> overrides,
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("metastore.warehouse.tenant.colocation", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String primaryBaseDir = Files.createTempDirectory("primary").toFile().getAbsolutePath();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir);
+    replicaConf = new HiveConf(clazz);
+    replicaConf.set("dfs.client.use.datanode.hostname", "true");
+    replicaConf.set("metastore.warehouse.tenant.colocation", "true");
+    replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath();
+    replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
     MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+            new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+    MiniDFSCluster miniDFSClusterReplica =
+            new MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
     Map<String, String> acidEnableConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
-        put("metastore.warehouse.tenant.colocation", "true");
-        put("hive.in.repl.test", "true");
-        put("hive.txn.readonly.enabled", "true");
-        //HIVE-25267
-        put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
-        put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "false");
-        put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname, "false");
-      }};
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("metastore.warehouse.tenant.colocation", "true");
+      put("hive.in.repl.test", "true");
+      put("hive.txn.readonly.enabled", "true");

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r726761398



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in case of bootstrap.
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot), conf),

Review comment:
       The 'current' one needs to be preserved in order to facilitate reusing snapshots while resuming bootstrap from the same directory (case discussed above). The 'old' one needs has been deleted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r690180325



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -270,8 +278,9 @@ boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, UserGroupInfo
       // snapshots.
       SnapshotUtils.allowSnapshot(targetFs, work.getFullyQualifiedTargetPath(), clonedConf);
       // Attempt to delete the snapshot, in case this is a bootstrap post a failed incremental, Since in case of
-      // bootstrap we go from start, so delete any pre-existing snapshot.
+      // bootstrap we go from start, so delete any pre-existing snapshot, (both snapshots can exist in case of failback)

Review comment:
       do you mean failover?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #2539: HIVE-25397: snapshot support for controlled failover

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r767480983



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -695,6 +695,10 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
         + "data copy, the target data is overwritten and the modifications are removed and the copy is again "
         + "attempted using the snapshot based approach. If disabled, the replication will fail in case the target is "
         + "modified."),
+    REPL_REUSE_SNAPSHOTS("hive.repl.reuse.snapshots", false,
+        "If enabled,reusing snapshots is attempted in case of controlled failover(B->A) when same paths are"
+        + "used for external table replication on src and target. Also in cases of failed incremental where re-bootstrap is required."
+        + "If set to true and snapshots exist in some paths, it creates/reuses new snapshots in those paths using the same name as exisiting snapshots."),

Review comment:
       Corrected

##########
File path: shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
##########
@@ -1237,25 +1238,42 @@ public boolean runDistCpWithSnapshots(String oldSnapshot, String newSnapshot, Li
         LOG.warn("Copy failed with INVALID_ARGUMENT for source: {} to target: {} snapshot1: {} snapshot2: {} "
             + "params: {}", srcPaths, dst, oldSnapshot, newSnapshot, params);
         return true;
-      } else if (returnCode == DistCpConstants.UNKNOWN_ERROR && overwriteTarget) {
+      } else if (returnCode == DistCpConstants.UNKNOWN_ERROR) {
         // Check if this error is due to target modified.
-        if (shouldRdiff(dst, conf, oldSnapshot, overwriteTarget)) {
-          LOG.warn("Copy failed due to target modified. Attempting to restore back the target. source: {} target: {} "
-              + "snapshot: {}", srcPaths, dst, oldSnapshot);
-          List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, dst, ".", oldSnapshot, conf, "-rdiff");
-          DistCp rDistcp = new DistCp(conf, null);
-          returnCode = rDistcp.run(rParams.toArray(new String[0]));
-          if (returnCode == 0) {
-            LOG.info("Target restored to previous state.  source: {} target: {} snapshot: {}. Reattempting to copy.",
-                srcPaths, dst, oldSnapshot);
-            dst.getFileSystem(conf).deleteSnapshot(dst, oldSnapshot);
-            dst.getFileSystem(conf).createSnapshot(dst, oldSnapshot);
-            returnCode = distcp.run(params.toArray(new String[0]));
+        if (targetModified(dst, conf, oldSnapshot)) {
+          if (overwriteTarget) {
+            LOG.warn("Copy failed due to target modified. Attempting to restore back the target. source: {} target: {} "
+                    + "snapshot: {}", srcPaths, dst, oldSnapshot);
+            List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, dst, ".", oldSnapshot, conf, "-rdiff");
+            DistCp rDistcp = new DistCp(conf, null);
+            returnCode = rDistcp.run(rParams.toArray(new String[0]));
             if (returnCode == 0) {
+              LOG.info("Target restored to previous state.  source: {} target: {} snapshot: {}. Reattempting to copy.",
+                      srcPaths, dst, oldSnapshot);
+              dst.getFileSystem(conf).deleteSnapshot(dst, oldSnapshot);
+              dst.getFileSystem(conf).createSnapshot(dst, oldSnapshot);
+              returnCode = distcp.run(params.toArray(new String[0]));
+              if (returnCode == 0) {
+                return true;
+              } else {
+                LOG.error("Copy failed with after target restore for source: {} to target: {} snapshot1: {} snapshot2: "
+                        + "{} params: {}. Return code: {}", srcPaths, dst, oldSnapshot, newSnapshot, params, returnCode);
+                return false;
+              }
+            }
+          } else {
+            //in case overwriteTarget is false, and we encounter an exception due to targetFs getting modified
+            // since last snapshot, then fall back to full distcp
+            LOG.warn("Copy failed due to target modified and overwrite is false. Attempting full distcp." +
+                    "Source:{}, target: {}",srcPaths, dst);
+            // Get the path relative to the initial snapshot for copy.
+            Path snapRelPath = new Path(srcPaths.get(0), HdfsConstants.DOT_SNAPSHOT_DIR + "/" + newSnapshot);
+            // Copy from the initial snapshot path.
+            if(runDistCp(Collections.singletonList(snapRelPath), dst, conf)) {
               return true;
             } else {
-              LOG.error("Copy failed with after target restore for source: {} to target: {} snapshot1: {} snapshot2: "
-                  + "{} params: {}. Return code: {}", srcPaths, dst, oldSnapshot, newSnapshot, params, returnCode);
+              LOG.error("Copy failed with full distcp for source: {} to target: {}  snapshot:"
+                      + "{} params: {}. Return code: {}", srcPaths, dst, newSnapshot, returnCode);

Review comment:
       Corrected




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org