You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/27 07:36:27 UTC

[hive] branch master updated: HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)

This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new afd2722310c HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)
afd2722310c is described below

commit afd2722310c712b504dff74082f9865c31d5a187
Author: vinitpatni <vi...@gmail.com>
AuthorDate: Fri Jan 27 13:06:15 2023 +0530

    HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 82 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  4 ++
 2 files changed, 86 insertions(+)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 396abd24b47..42ef25756ae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -32,11 +32,17 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
 import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests;
 
 import org.jetbrains.annotations.NotNull;
 import org.junit.After;
@@ -906,6 +912,82 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA
         .verifyFailure(new String[]{"tnew_managed"});
   }
 
+  @Test
+  public void testTblMetricRegisterDuringSecondCycleOfOptimizedBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into table t1_managed values (10)")
+            .run("insert into table t1_managed values (20),(31),(42)")
+            .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1_managed"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+            .run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into table t2_managed values (10)")
+            .run("insert into table t2_managed values (20),(31),(42)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1_managed", "t2_managed"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table t1_managed values (30)")
+            .run("insert into table t1_managed values (50),(51),(52)");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(false);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+            .containsAll(Arrays.asList("t1_managed")));
+
+    isMetricsEnabledForTests(true);
+    replica.dump(replicatedDbName, withClause);
+    MetricCollector collector = MetricCollector.getInstance();
+    ReplicationMetric metric = collector.getMetrics().getLast();
+    Stage stage = metric.getProgress().getStageByName("REPL_DUMP");
+    Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
+    assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
+  }
+
   @NotNull
   private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
     List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 50146725def..2cda6b30b59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -867,6 +867,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)) {
         work.getMetricCollector().reportFailoverStart(getName(), metricMap, work.getFailoverMetadata());
       } else {
+        int size = tablesForBootstrap.size();
+        if (size > 0) {
+          metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) tablesForBootstrap.size());
+        }
         work.getMetricCollector().reportStageStart(getName(), metricMap);
       }
       long dumpedCount = resumeFrom - work.eventFrom;