You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/27 07:36:27 UTC
[hive] branch master updated: HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new afd2722310c HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)
afd2722310c is described below
commit afd2722310c712b504dff74082f9865c31d5a187
Author: vinitpatni <vi...@gmail.com>
AuthorDate: Fri Jan 27 13:06:15 2023 +0530
HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)
---
.../parse/TestReplicationOptimisedBootstrap.java | 82 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 4 ++
2 files changed, 86 insertions(+)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 396abd24b47..42ef25756ae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -32,11 +32,17 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
@@ -906,6 +912,82 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA
.verifyFailure(new String[]{"tnew_managed"});
}
+ @Test
+ public void testTblMetricRegisterDuringSecondCycleOfOptimizedBootstrap() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ // Do the bootstrap load and check all the external & managed tables are present.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1_managed"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+ tuple = primary.run("use " + primaryDbName)
+ .run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table t2_managed values (10)")
+ .run("insert into table t2_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1_managed", "t2_managed"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ primary.run("use " + primaryDbName)
+ .run("insert into table t1_managed values (30)")
+ .run("insert into table t1_managed values (50),(51),(52)");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(false);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check the event ack file got created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+
+ // Do a load, this should create a table_diff_complete directory
+ primary.load(primaryDbName,replicatedDbName, withClause);
+
+ // Check the table diff directory exist.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ Path dumpPath = new Path(tuple.dumpLocation);
+ // Check the table diff has all the modified table, including the dropped and empty ones
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t1_managed")));
+
+ isMetricsEnabledForTests(true);
+ replica.dump(replicatedDbName, withClause);
+ MetricCollector collector = MetricCollector.getInstance();
+ ReplicationMetric metric = collector.getMetrics().getLast();
+ Stage stage = metric.getProgress().getStageByName("REPL_DUMP");
+ Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
+ assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
+ }
+
@NotNull
private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 50146725def..2cda6b30b59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -867,6 +867,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)) {
work.getMetricCollector().reportFailoverStart(getName(), metricMap, work.getFailoverMetadata());
} else {
+ int size = tablesForBootstrap.size();
+ if (size > 0) {
+ metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) tablesForBootstrap.size());
+ }
work.getMetricCollector().reportStageStart(getName(), metricMap);
}
long dumpedCount = resumeFrom - work.eventFrom;