You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by aa...@apache.org on 2021/10/20 05:00:59 UTC

[hive] branch master updated: HIVE-25602: Fix failover metadata file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a6cd71b  HIVE-25602: Fix failover metadata file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin Kumar Sinha)
a6cd71b is described below

commit a6cd71bad4a101785a5655fa62a82d2625d2296f
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Wed Oct 20 10:30:48 2021 +0530

    HIVE-25602: Fix failover metadata file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin Kumar Sinha)
    
    * HIVE-25602: Fix failover metadata file path in repl load execution.
    
    * Flaky test correction.
---
 .../parse/TestScheduledReplicationScenarios.java   | 148 +++++++++++++++++++++
 .../incremental/IncrementalLoadTasksBuilder.java   |   2 +-
 2 files changed, 149 insertions(+), 1 deletion(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
index 9f8185a..cca89b9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
@@ -22,8 +22,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -252,6 +254,152 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
   }
 
   @Test
+  public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
+    String withClause = "'" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true','"
+            + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'" ;
+
+    String sourceDbName = "sourceDbName";
+    String replicaDbName = "replicaDbName";
+    // Create a table with some data at source DB.
+    primary.run("create database " + sourceDbName + " with dbproperties('repl.source.for'='a')")
+            .run("use " + sourceDbName)
+            .run("create table t2 (id int)").run("insert into t2 values(1)").run("insert into t2 values(2)");
+
+    // Schedule Dump & Load and verify the data is replicated properly.
+    try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService
+            .startScheduledQueryExecutorService(primary.hiveConf)) {
+      int next = -1;
+      ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
+      primary.run("create scheduled query repl_dump_p1 every 5 seconds as repl dump "
+              + sourceDbName +  " WITH(" + withClause + ')');
+      replica.run("create scheduled query repl_load_p1 every 5 seconds as repl load "
+              + sourceDbName + " INTO " + replicaDbName +  " WITH(" + withClause + ')');
+
+      Path dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
+      FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      replica.run("use " + replicaDbName).run("show tables like 't2'")
+              .verifyResult("t2").run("select id from t2 order by id")
+              .verifyResults(new String[] {"1", "2"});
+
+      //Start failover from here.
+      String startFailoverClause = withClause.concat(",'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+      primary.run("alter scheduled query repl_dump_p1 defined as repl dump " + sourceDbName +  " WITH(" + startFailoverClause + ')');
+      replica.run("alter scheduled query repl_load_p1 defined as repl load "
+              + sourceDbName + " INTO " + replicaDbName +  " WITH(" + startFailoverClause + ')');
+
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot,
+              String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      Path failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+              + File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
+      assertTrue(fs.exists(failoverReadyMarker));
+      assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
+              MetaStoreUtils.FailoverEndpoint.SOURCE));
+      assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
+              MetaStoreUtils.FailoverEndpoint.TARGET));
+
+      primary.run("alter scheduled query repl_dump_p1 disabled")
+              .run("alter scheduled query repl_dump_p1 defined as repl dump "
+                      + sourceDbName +  " WITH(" + withClause + ')')
+              .run("alter database " + sourceDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
+              .run("drop database " + sourceDbName + " cascade");
+
+      assertTrue(primary.getDatabase(sourceDbName) == null);
+
+      replica.run("alter scheduled query repl_load_p1 disabled")
+              .run("alter scheduled query repl_load_p1 defined as repl load "
+                      + sourceDbName + " INTO " + replicaDbName +  " WITH(" + withClause + ')')
+              .run("create scheduled query repl_dump_p2 every 5 seconds as repl dump " + replicaDbName +  " WITH(" + withClause + ')');
+
+      primary.run("create scheduled query repl_load_p2 every 5 seconds as repl load "
+              + replicaDbName + " INTO " + sourceDbName +  " WITH(" + withClause + ')');
+
+      dumpRoot = ReplUtils.getEncodedDumpRootPath(replica.hiveConf, replicaDbName.toLowerCase());
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot,
+              String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicaDbName)));
+      Database primaryDb = primary.getDatabase(sourceDbName);
+      assertFalse(primaryDb == null);
+      assertTrue(MetaStoreUtils.isTargetOfReplication(primaryDb));
+      assertFalse(MetaStoreUtils.isDbBeingFailedOver(primaryDb));
+
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot,
+              String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(sourceDbName).getParameters()));
+      assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicaDbName)));
+
+      //Start failback from here.
+      replica.run("alter  scheduled query repl_dump_p2 defined as repl dump " + replicaDbName +  " WITH(" + startFailoverClause + ')');
+      primary.run("alter scheduled query repl_load_p2 defined as repl load "
+              + replicaDbName + " INTO " + sourceDbName +  " WITH(" + startFailoverClause + ')');
+
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+              + File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
+      assertTrue(fs.exists(failoverReadyMarker));
+      assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
+              MetaStoreUtils.FailoverEndpoint.SOURCE));
+      assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
+              MetaStoreUtils.FailoverEndpoint.TARGET));
+
+      replica.run("alter scheduled query repl_dump_p2 disabled")
+              .run("alter scheduled query repl_dump_p2 defined as repl dump "
+                      + replicaDbName +  " WITH(" + withClause + ')')
+              .run("alter database " + replicaDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
+              .run("drop database " + replicaDbName + " cascade")
+              .run("alter scheduled query repl_load_p1 enabled");
+
+      assertTrue(replica.getDatabase(replicaDbName) == null);
+
+      primary.run("alter scheduled query repl_load_p2 disabled")
+              .run("alter scheduled query repl_load_p2 defined as repl load "
+                      + replicaDbName + " INTO " + sourceDbName +  " WITH(" + withClause + ')')
+              .run("alter scheduled query repl_dump_p1 enabled");
+
+      dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+              + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+
+      assertFalse(MetaStoreUtils.isTargetOfReplication(primary.getDatabase(sourceDbName)));
+      Database replicaDb = replica.getDatabase(replicaDbName);
+      assertFalse(replicaDb == null);
+      assertTrue(MetaStoreUtils.isTargetOfReplication(replicaDb));
+      assertFalse(MetaStoreUtils.isDbBeingFailedOver(replicaDb));
+
+      next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+      ackPath = new Path(dumpRoot,
+              String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+                      + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+      waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+      assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicaDbName).getParameters()));
+      assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(sourceDbName)));
+
+    } finally {
+      primary.run("drop database if exists " + sourceDbName + " cascade").run("drop scheduled query repl_dump_p1");
+      replica.run("drop database if exists " + replicaDbName + " cascade").run("drop scheduled query repl_load_p1");
+      primary.run("drop scheduled query repl_load_p2");
+      replica.run("drop scheduled query repl_dump_p2");
+    }
+  }
+
+  @Test
   public void testSetPolicyId() throws Throwable {
     String withClause =
         " WITH('" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index ec7e47c..0eabc1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -101,7 +101,7 @@ public class IncrementalLoadTasksBuilder {
     this.shouldFailover = shouldFailover;
     if (shouldFailover) {
       this.metricCollector.reportFailoverStart("REPL_LOAD", metricMap,
-              new FailoverMetaData(new Path(dumpDirectory), conf));
+              new FailoverMetaData(new Path(dumpDirectory, ReplUtils.REPL_HIVE_BASE_DIR), conf));
     } else {
       this.metricCollector.reportStageStart("REPL_LOAD", metricMap);
     }