You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/02/08 08:06:51 UTC

[hive] branch master updated: HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)

This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f7ebbcad5 HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)
11f7ebbcad5 is described below

commit 11f7ebbcad590fe569ce8f8588f667a6274d657b
Author: Rakshith C <56...@users.noreply.github.com>
AuthorDate: Wed Feb 8 13:36:42 2023 +0530

    HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)
---
 .../parse/TestReplicationScenariosAcidTables.java  | 61 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  6 ++-
 .../ql/parse/repl/dump/events/EventHandler.java    |  5 ++
 3 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index fda11c127e4..0dfb07f2282 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -55,6 +55,10 @@ import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
@@ -92,6 +96,7 @@ import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUN
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 
+import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -3736,4 +3741,60 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     ReplDumpWork.testDeletePreviousDumpMetaPath(false);
   }
+  @Test
+  public void testEventsDumpedCountWithFilteringOfOpenTransactions() throws Throwable {
+    final int REPL_MAX_LOAD_TASKS = 5;
+    List<String> incrementalBatchConfigs = Arrays.asList(
+            String.format("'%s'='%s'", HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, "true"),
+            String.format("'%s'='%d'", HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, REPL_MAX_LOAD_TASKS),
+            String.format("'%s'='%s'", HiveConf.ConfVars.REPL_FILTER_TRANSACTIONS, "true")
+    );
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .dump(primaryDbName, incrementalBatchConfigs);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+
+    replica.load(replicatedDbName, primaryDbName, incrementalBatchConfigs)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1"});
+
+    isMetricsEnabledForTests(true);
+    MetricCollector collector = MetricCollector.getInstance();
+    //incremental run
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(2)")
+            .run("insert into t1 values(3)")
+            .run("select * from t1")  // will open a read only transaction which should be filtered.
+            .run("insert into t1 values(4)")
+            .run("insert into t1 values(5)")
+            .dump(primaryDbName, incrementalBatchConfigs);
+
+    ReplicationMetric metric = collector.getMetrics().getLast();
+    Stage stage = metric.getProgress().getStageByName("REPL_DUMP");
+    Metric eventMetric = stage.getMetricByName(ReplUtils.MetricName.EVENTS.name());
+    long eventCountFromMetrics = eventMetric.getTotalCount();
+
+    Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+    EventsDumpMetadata eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf);
+
+    int eventsCountInAckFile = eventsDumpMetadata.getEventsDumpedCount(), eventCountFromStagingDir = 0;
+
+    String eventsBatchDirPrefix = ReplUtils.INC_EVENTS_BATCH.replaceAll("%d", "");
+    List<FileStatus> batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+            .filter(fileStatus -> fileStatus.getPath().getName()
+                    .startsWith(eventsBatchDirPrefix)).collect(Collectors.toList());
+
+    for (FileStatus fileStatus : batchFiles) {
+      eventCountFromStagingDir += fs.listStatus(fileStatus.getPath()).length;
+    }
+    // open transactions were filtered.
+    assertTrue(eventCountFromStagingDir < eventCountFromMetrics);
+    // ensure event count is captured appropriately in EventsDumpMetadata.
+    assertEquals(eventsCountInAckFile, eventCountFromStagingDir);
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3e3ae8e18d6..e0b58a64493 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -1254,8 +1254,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     );
     EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
     eventHandler.handle(context);
-    eventsDumpMetadata.incrementEventsDumpedCount();
-    work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1);
+    if (context.isDmdCreated()) {
+      eventsDumpMetadata.incrementEventsDumpedCount();
+      work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1);
+    }
     work.getReplLogger().eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString());
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index ae70298ce2b..077ee3ed5ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -45,6 +45,7 @@ public interface EventHandler {
     final ReplScope replScope;
     final ReplScope oldReplScope;
     private Set<String> tablesForBootstrap;
+    private boolean dmdCreated;
 
     public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf,
                    ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope,
@@ -77,6 +78,7 @@ public interface EventHandler {
     }
 
     DumpMetaData createDmd(EventHandler eventHandler) {
+      this.dmdCreated = true;
       return new DumpMetaData(
           eventRoot,
           eventHandler.dumpType(),
@@ -99,5 +101,8 @@ public interface EventHandler {
       assert tableName != null;
       return tablesForBootstrap.remove(tableName.toLowerCase());
     }
+    public boolean isDmdCreated() {
+      return dmdCreated;
+    }
   }
 }