You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/02/08 08:06:51 UTC
[hive] branch master updated: HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 11f7ebbcad5 HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)
11f7ebbcad5 is described below
commit 11f7ebbcad590fe569ce8f8588f667a6274d657b
Author: Rakshith C <56...@users.noreply.github.com>
AuthorDate: Wed Feb 8 13:36:42 2023 +0530
HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi)
---
.../parse/TestReplicationScenariosAcidTables.java | 61 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 6 ++-
.../ql/parse/repl/dump/events/EventHandler.java | 5 ++
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index fda11c127e4..0dfb07f2282 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -55,6 +55,10 @@ import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
@@ -92,6 +96,7 @@ import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUN
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -3736,4 +3741,60 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
ReplDumpWork.testDeletePreviousDumpMetaPath(false);
}
+ @Test
+ public void testEventsDumpedCountWithFilteringOfOpenTransactions() throws Throwable {
+ final int REPL_MAX_LOAD_TASKS = 5;
+ List<String> incrementalBatchConfigs = Arrays.asList(
+ String.format("'%s'='%s'", HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, "true"),
+ String.format("'%s'='%d'", HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, REPL_MAX_LOAD_TASKS),
+ String.format("'%s'='%s'", HiveConf.ConfVars.REPL_FILTER_TRANSACTIONS, "true")
+ );
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .dump(primaryDbName, incrementalBatchConfigs);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+
+ replica.load(replicatedDbName, primaryDbName, incrementalBatchConfigs)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1"});
+
+ isMetricsEnabledForTests(true);
+ MetricCollector collector = MetricCollector.getInstance();
+ //incremental run
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+ .run("insert into t1 values(2)")
+ .run("insert into t1 values(3)")
+ .run("select * from t1") // will open a read only transaction which should be filtered.
+ .run("insert into t1 values(4)")
+ .run("insert into t1 values(5)")
+ .dump(primaryDbName, incrementalBatchConfigs);
+
+ ReplicationMetric metric = collector.getMetrics().getLast();
+ Stage stage = metric.getProgress().getStageByName("REPL_DUMP");
+ Metric eventMetric = stage.getMetricByName(ReplUtils.MetricName.EVENTS.name());
+ long eventCountFromMetrics = eventMetric.getTotalCount();
+
+ Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ Path ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+ EventsDumpMetadata eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf);
+
+ int eventsCountInAckFile = eventsDumpMetadata.getEventsDumpedCount(), eventCountFromStagingDir = 0;
+
+ String eventsBatchDirPrefix = ReplUtils.INC_EVENTS_BATCH.replaceAll("%d", "");
+ List<FileStatus> batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+ .filter(fileStatus -> fileStatus.getPath().getName()
+ .startsWith(eventsBatchDirPrefix)).collect(Collectors.toList());
+
+ for (FileStatus fileStatus : batchFiles) {
+ eventCountFromStagingDir += fs.listStatus(fileStatus.getPath()).length;
+ }
+ // open transactions were filtered.
+ assertTrue(eventCountFromStagingDir < eventCountFromMetrics);
+ // ensure event count is captured appropriately in EventsDumpMetadata.
+ assertEquals(eventsCountInAckFile, eventCountFromStagingDir);
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3e3ae8e18d6..e0b58a64493 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -1254,8 +1254,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
);
EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
eventHandler.handle(context);
- eventsDumpMetadata.incrementEventsDumpedCount();
- work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1);
+ if (context.isDmdCreated()) {
+ eventsDumpMetadata.incrementEventsDumpedCount();
+ work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1);
+ }
work.getReplLogger().eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index ae70298ce2b..077ee3ed5ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -45,6 +45,7 @@ public interface EventHandler {
final ReplScope replScope;
final ReplScope oldReplScope;
private Set<String> tablesForBootstrap;
+ private boolean dmdCreated;
public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf,
ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope,
@@ -77,6 +78,7 @@ public interface EventHandler {
}
DumpMetaData createDmd(EventHandler eventHandler) {
+ this.dmdCreated = true;
return new DumpMetaData(
eventRoot,
eventHandler.dumpType(),
@@ -99,5 +101,8 @@ public interface EventHandler {
assert tableName != null;
return tablesForBootstrap.remove(tableName.toLowerCase());
}
+ public boolean isDmdCreated() {
+ return dmdCreated;
+ }
}
}