You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by aa...@apache.org on 2021/10/20 05:00:59 UTC
[hive] branch master updated: HIVE-25602: Fix failover metadata
file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin
Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a6cd71b HIVE-25602: Fix failover metadata file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin Kumar Sinha)
a6cd71b is described below
commit a6cd71bad4a101785a5655fa62a82d2625d2296f
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Wed Oct 20 10:30:48 2021 +0530
HIVE-25602: Fix failover metadata file path in repl load execution. (#2707)(Haymant Mangla reviewed by Pravin Kumar Sinha)
* HIVE-25602: Fix failover metadata file path in repl load execution.
* Flaky test correction.
---
.../parse/TestScheduledReplicationScenarios.java | 148 +++++++++++++++++++++
.../incremental/IncrementalLoadTasksBuilder.java | 2 +-
2 files changed, 149 insertions(+), 1 deletion(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
index 9f8185a..cca89b9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
@@ -22,8 +22,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -252,6 +254,152 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
}
@Test
+ public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
+ String withClause = "'" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true','"
+ + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'" ;
+
+ String sourceDbName = "sourceDbName";
+ String replicaDbName = "replicaDbName";
+ // Create a table with some data at source DB.
+ primary.run("create database " + sourceDbName + " with dbproperties('repl.source.for'='a')")
+ .run("use " + sourceDbName)
+ .run("create table t2 (id int)").run("insert into t2 values(1)").run("insert into t2 values(2)");
+
+ // Schedule Dump & Load and verify the data is replicated properly.
+ try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService
+ .startScheduledQueryExecutorService(primary.hiveConf)) {
+ int next = -1;
+ ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true);
+ primary.run("create scheduled query repl_dump_p1 every 5 seconds as repl dump "
+ + sourceDbName + " WITH(" + withClause + ')');
+ replica.run("create scheduled query repl_load_p1 every 5 seconds as repl load "
+ + sourceDbName + " INTO " + replicaDbName + " WITH(" + withClause + ')');
+
+ Path dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
+ FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf);
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ replica.run("use " + replicaDbName).run("show tables like 't2'")
+ .verifyResult("t2").run("select id from t2 order by id")
+ .verifyResults(new String[] {"1", "2"});
+
+ //Start failover from here.
+ String startFailoverClause = withClause.concat(",'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ primary.run("alter scheduled query repl_dump_p1 defined as repl dump " + sourceDbName + " WITH(" + startFailoverClause + ')');
+ replica.run("alter scheduled query repl_load_p1 defined as repl load "
+ + sourceDbName + " INTO " + replicaDbName + " WITH(" + startFailoverClause + ')');
+
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot,
+ String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ Path failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
+ assertTrue(fs.exists(failoverReadyMarker));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
+ MetaStoreUtils.FailoverEndpoint.SOURCE));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
+ MetaStoreUtils.FailoverEndpoint.TARGET));
+
+ primary.run("alter scheduled query repl_dump_p1 disabled")
+ .run("alter scheduled query repl_dump_p1 defined as repl dump "
+ + sourceDbName + " WITH(" + withClause + ')')
+ .run("alter database " + sourceDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
+ .run("drop database " + sourceDbName + " cascade");
+
+ assertTrue(primary.getDatabase(sourceDbName) == null);
+
+ replica.run("alter scheduled query repl_load_p1 disabled")
+ .run("alter scheduled query repl_load_p1 defined as repl load "
+ + sourceDbName + " INTO " + replicaDbName + " WITH(" + withClause + ')')
+ .run("create scheduled query repl_dump_p2 every 5 seconds as repl dump " + replicaDbName + " WITH(" + withClause + ')');
+
+ primary.run("create scheduled query repl_load_p2 every 5 seconds as repl load "
+ + replicaDbName + " INTO " + sourceDbName + " WITH(" + withClause + ')');
+
+ dumpRoot = ReplUtils.getEncodedDumpRootPath(replica.hiveConf, replicaDbName.toLowerCase());
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot,
+ String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicaDbName)));
+ Database primaryDb = primary.getDatabase(sourceDbName);
+ assertFalse(primaryDb == null);
+ assertTrue(MetaStoreUtils.isTargetOfReplication(primaryDb));
+ assertFalse(MetaStoreUtils.isDbBeingFailedOver(primaryDb));
+
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot,
+ String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(sourceDbName).getParameters()));
+ assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicaDbName)));
+
+ //Start failback from here.
+ replica.run("alter scheduled query repl_dump_p2 defined as repl dump " + replicaDbName + " WITH(" + startFailoverClause + ')');
+ primary.run("alter scheduled query repl_load_p2 defined as repl load "
+ + replicaDbName + " INTO " + sourceDbName + " WITH(" + startFailoverClause + ')');
+
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ failoverReadyMarker = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.FAILOVER_READY_MARKER.toString());
+ assertTrue(fs.exists(failoverReadyMarker));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicaDbName),
+ MetaStoreUtils.FailoverEndpoint.SOURCE));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(sourceDbName),
+ MetaStoreUtils.FailoverEndpoint.TARGET));
+
+ replica.run("alter scheduled query repl_dump_p2 disabled")
+ .run("alter scheduled query repl_dump_p2 defined as repl dump "
+ + replicaDbName + " WITH(" + withClause + ')')
+ .run("alter database " + replicaDbName + " set dbproperties('" + SOURCE_OF_REPLICATION + "'='')")
+ .run("drop database " + replicaDbName + " cascade")
+ .run("alter scheduled query repl_load_p1 enabled");
+
+ assertTrue(replica.getDatabase(replicaDbName) == null);
+
+ primary.run("alter scheduled query repl_load_p2 disabled")
+ .run("alter scheduled query repl_load_p2 defined as repl load "
+ + replicaDbName + " INTO " + sourceDbName + " WITH(" + withClause + ')')
+ .run("alter scheduled query repl_dump_p1 enabled");
+
+ dumpRoot = ReplUtils.getEncodedDumpRootPath(primary.hiveConf, sourceDbName.toLowerCase());
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+
+ assertFalse(MetaStoreUtils.isTargetOfReplication(primary.getDatabase(sourceDbName)));
+ Database replicaDb = replica.getDatabase(replicaDbName);
+ assertFalse(replicaDb == null);
+ assertTrue(MetaStoreUtils.isTargetOfReplication(replicaDb));
+ assertFalse(MetaStoreUtils.isDbBeingFailedOver(replicaDb));
+
+ next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1;
+ ackPath = new Path(dumpRoot,
+ String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR
+ + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString());
+ waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT);
+ assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicaDbName).getParameters()));
+ assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(sourceDbName)));
+
+ } finally {
+ primary.run("drop database if exists " + sourceDbName + " cascade").run("drop scheduled query repl_dump_p1");
+ replica.run("drop database if exists " + replicaDbName + " cascade").run("drop scheduled query repl_load_p1");
+ primary.run("drop scheduled query repl_load_p2");
+ replica.run("drop scheduled query repl_dump_p2");
+ }
+ }
+
+ @Test
public void testSetPolicyId() throws Throwable {
String withClause =
" WITH('" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index ec7e47c..0eabc1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -101,7 +101,7 @@ public class IncrementalLoadTasksBuilder {
this.shouldFailover = shouldFailover;
if (shouldFailover) {
this.metricCollector.reportFailoverStart("REPL_LOAD", metricMap,
- new FailoverMetaData(new Path(dumpDirectory), conf));
+ new FailoverMetaData(new Path(dumpDirectory, ReplUtils.REPL_HIVE_BASE_DIR), conf));
} else {
this.metricCollector.reportStageStart("REPL_LOAD", metricMap);
}