You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/31 04:53:56 UTC
[hive] branch master updated: HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0a5558d71c4 HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)
0a5558d71c4 is described below
commit 0a5558d71c474f285c1e6d338a4be93bd1c6ce13
Author: harshal-16 <10...@users.noreply.github.com>
AuthorDate: Tue Jan 31 10:23:45 2023 +0530
HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)
---
.../parse/TestReplicationScenariosAcidTables.java | 8 +-
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 2 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 40 ++++++---
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 5 ++
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 ++-
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 40 +++++----
.../apache/hadoop/hive/ql/parse/repl/DumpType.java | 12 +++
.../dump/metric/BootstrapDumpMetricCollector.java | 4 +-
.../metric/IncrementalDumpMetricCollector.java | 4 +-
... => OptimizedBootstrapDumpMetricCollector.java} | 11 +--
... PreOptimizedBootstrapDumpMetricCollector.java} | 11 +--
.../hive/ql/parse/repl/load/DumpMetaData.java | 23 +++++
.../OptimizedBootstrapLoadMetricCollector.java} | 12 +--
.../PreOptimizedBootstrapLoadMetricCollector.java} | 13 ++-
.../hive/ql/parse/repl/metric/event/Metadata.java | 4 +-
.../metric/TestReplicationMetricCollector.java | 99 ++++++++++++++++++++--
.../repl/metric/TestReplicationMetricSink.java | 8 +-
.../TestReplicationMetricUpdateOnFailure.java | 6 +-
.../llap/replication_metrics_ingest.q.out | 2 +-
19 files changed, 232 insertions(+), 82 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d09120bcc8c..e23e9d4bb00 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -397,7 +397,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
@@ -625,7 +625,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
assertTrue(fs.exists(dumpAckFile));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
assertTrue(MetaStoreUtils.isTargetOfReplication(db));
@@ -734,7 +734,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
assertTrue(fs.exists(dumpAckFile));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
assertTrue(MetaStoreUtils.isTargetOfReplication(db));
@@ -748,7 +748,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(preFailoverDumpData.dumpLocation)));
assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
assertTrue(fs.exists(dumpAckFile));
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index 7074226e14f..9ff0d244bbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -260,7 +260,7 @@ public class OptimisedBootstrapUtils {
LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId,
targetDbEventId);
work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
- dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
+ dmd.setDump(DumpType.PRE_OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, -1L, false);
dmd.write(true);
return lastReplId;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 8f2d94d5a6c..aa02b05f618 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -88,9 +88,12 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.PreOptimizedBootstrapDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -199,7 +202,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. ");
Path nonRecoverableFile = new Path(latestDumpPath, NON_RECOVERABLE_MARKER.toString());
- ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf);
+ ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf, work.dbNameOrPattern, null);
setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format()));
return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode();
}
@@ -226,7 +229,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String mapRedCustomName = ReplUtils.getDistCpCustomName(conf, work.dbNameOrPattern);
conf.set(JobContext.JOB_NAME, mapRedCustomName);
work.setCurrentDumpPath(currentDumpPath);
- work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot));
+ work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot, isFailover));
if (shouldDumpAtlasMetadata()) {
addAtlasDumpTask(work.isBootstrap(), previousValidHiveDumpPath);
LOG.info("Added task to dump atlas metadata.");
@@ -252,9 +255,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
+ Map<String, Long> metricMap = new HashMap<>();
+ metricMap.put(ReplUtils.MetricName.EVENTS.name(), 0L);
+ work.getMetricCollector().reportStageStart(getName(), metricMap);
lastReplId =
createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work);
finishRemainingTasks();
+ work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS);
} else {
// We should be here only if TableDiff is Present.
boolean isTableDiffDirectoryPresent =
@@ -265,7 +272,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
assert isTableDiffDirectoryPresent;
work.setSecondDumpAfterFailover(true);
-
+ long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+ work.setMetricCollector(new OptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId));
long fromEventId = Long.parseLong(getEventIdFromFile(previousValidHiveDumpPath.getParent(), conf)[1]);
LOG.info("Starting optimised bootstrap from event id {} for database {}", fromEventId,
work.dbNameOrPattern);
@@ -303,7 +311,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
} else {
LOG.info("Previous Dump is not yet loaded. Skipping this iteration.");
}
- ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf);
+ ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf,
+ work.dbNameOrPattern, work.isBootstrap() ? ReplicationType.BOOTSTRAP: ReplicationType.INCREMENTAL);
}
}
} catch (RuntimeException e) {
@@ -914,8 +923,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
} finally {
//write the dmd always irrespective of success/failure to enable checkpointing in table level replication
long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
- dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId,
- previousReplScopeModified());
+ if (work.isSecondDumpAfterFailover()){
+ dmd.setDump(DumpType.OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, executionId,
+ previousReplScopeModified());
+ }
+ else {
+ dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId,
+ previousReplScopeModified());
+ }
// If repl policy is changed (oldReplScope is set), then pass the current replication policy,
// so that REPL LOAD would drop the tables which are not included in current policy.
dmd.setReplScope(work.replScope);
@@ -1053,12 +1068,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
- private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) {
+ private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot, boolean isFailover) {
ReplicationMetricCollector collector;
+ long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
if (isBootstrap) {
- collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf);
+ collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
} else {
- collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf);
+ if (isFailover) {
+ collector = new PreOptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
+ } else {
+ collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
+ }
}
return collector;
}
@@ -1425,8 +1445,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Path backingFile = new Path(dumpRoot, fileName);
return new FileList(backingFile, conf);
}
-
-
private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
try {
return dumpMetaData.getEventFrom() != null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e2ccd053607..3e7961e9a68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.thrift.TException;
import com.google.common.collect.Collections2;
import org.apache.commons.lang3.StringUtils;
@@ -776,6 +778,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
LOG.error("The database {} is already source of replication.", targetDb.getName());
throw new Exception("Failover target was not source of replication");
}
+ work.getMetricCollector().reportStageStart(STAGE_NAME, new HashMap<>());
boolean isTableDiffPresent =
checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
boolean isAbortTxnsListPresent =
@@ -800,6 +803,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
this.childTasks = new ArrayList<>();
}
createReplLoadCompleteAckTask();
+ work.getMetricCollector().reportStageEnd(STAGE_NAME, Status.SUCCESS);
+ work.getMetricCollector().reportEnd(Status.SUCCESS);
return 0;
} else if (work.isSecondFailover) {
// DROP the tables extra on target, which are not on source cluster.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index aa2acb19406..607804449b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -68,7 +68,9 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricColle
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -533,10 +535,12 @@ public class ReplUtils {
}
public static void reportStatusInReplicationMetrics(String stageName, Status status, String errorLogPath,
- HiveConf conf)
+ HiveConf conf, String dbName, Metadata.ReplicationType replicationType)
throws SemanticException {
- ReplicationMetricCollector metricCollector =
- new ReplicationMetricCollector(null, null, null, 0, conf) {};
+ ReplicationMetricCollector metricCollector;
+ metricCollector =
+ new ReplicationMetricCollector(dbName, replicationType, null, 0, conf) {};
+
metricCollector.reportStageStart(stageName, new HashMap<>());
metricCollector.reportStageEnd(stageName, status, errorLogPath);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 34406ee05e8..f32c38be282 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.OptimizedBootstrapLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -97,8 +99,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
try {
analyzeReplDump(ast);
} catch (SemanticException e) {
- ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e)
- ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+ ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e)
+ ? Status.FAILED_ADMIN : Status.FAILED, null, conf, null, null);
throw e;
}
break;
@@ -110,7 +112,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
} catch (SemanticException e) {
if (!e.getMessage().equals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg())) {
ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", ReplUtils.isErrorRecoverable(e)
- ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+ ? Status.FAILED_ADMIN : Status.FAILED, null, conf, null, null);
}
throw e;
}
@@ -318,7 +320,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// looking at each db, and then each table, and then setting up the appropriate
// import job in its place.
try {
- assert(sourceDbNameOrPattern != null);
+ assert (sourceDbNameOrPattern != null);
Path loadPath = getCurrentLoadPath();
// Now, the dumped path can be one of three things:
@@ -342,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
Path nonRecoverableFile = new Path(latestDumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString());
ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED,
- nonRecoverableFile.toString(), conf);
+ nonRecoverableFile.toString(), conf, sourceDbNameOrPattern, null);
throw new Exception(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg());
}
if (loadPath != null) {
@@ -350,21 +352,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
boolean evDump = false;
// we will decide what hdfs locations needs to be copied over here as well.
- if (dmd.isIncrementalDump()) {
- LOG.debug("{} contains an incremental dump", loadPath);
+ if (dmd.isIncrementalDump() || dmd.isOptimizedBootstrapDump() || dmd.isPreOptimizedBootstrapDump()) {
+ LOG.debug("{} contains an incremental / Optimized bootstrap dump", loadPath);
evDump = true;
} else {
LOG.debug("{} contains an bootstrap dump", loadPath);
}
+
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern,
replScope.getDbName(),
dmd.getReplScope(),
queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(),
- initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(),
- dmd.getDumpExecutionId()), dmd.isReplScopeModified());
+ initMetricCollection(loadPath.toString(), replScope.getDbName(), dmd), dmd.isReplScopeModified());
rootTasks.add(TaskFactory.get(replLoadWork, conf));
+ if (dmd.isPreOptimizedBootstrapDump()) {
+ dmd.setOptimizedBootstrapToDumpMetadataFile();
+ }
} else {
- ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf);
+ ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf, sourceDbNameOrPattern, null);
LOG.warn("Previous Dump Already Loaded");
}
} catch (Exception e) {
@@ -373,13 +378,18 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, String dumpDirectory,
- String dbNameToLoadIn, long dumpExecutionId) {
+ private ReplicationMetricCollector initMetricCollection(String dumpDirectory,
+ String dbNameToLoadIn, DumpMetaData dmd) throws SemanticException {
+
ReplicationMetricCollector collector;
- if (isBootstrap) {
- collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf);
+ if (dmd.isPreOptimizedBootstrapDump()) {
+ collector = new PreOptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
+ } else if (dmd.isOptimizedBootstrapDump()) {
+ collector = new OptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
+ } else if (dmd.isBootstrapDump()) {
+ collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
} else {
- collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf);
+ collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
}
return collector;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 58b6d2fcbb5..4ab7c7dd701 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -200,6 +200,18 @@ public enum DumpType {
return new DefaultHandler();
}
},
+ PRE_OPTIMIZED_BOOTSTRAP("PRE_OPTIMIZED_BOOTSTRAP") {
+ @Override
+ public MessageHandler handler() {
+ return new DefaultHandler();
+ }
+ },
+ OPTIMIZED_BOOTSTRAP("OPTIMIZED_BOOTSTRAP") {
+ @Override
+ public MessageHandler handler() {
+ return new DefaultHandler();
+ }
+ },
EVENT_CREATE_DATABASE("EVENT_CREATE_DATABASE") {
@Override
public MessageHandler handler() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
index 48a37b61711..fb458eeb083 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
* Bootstrap Dump Metric Collector
*/
public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
- public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+ public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+ super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, executorId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
index 2c0eb473f3d..1722181b46c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
* Incremental Dump Metric Collector
*/
public class IncrementalDumpMetricCollector extends ReplicationMetricCollector {
- public IncrementalDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, 0, conf);
+ public IncrementalDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+ super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, executorId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
similarity index 76%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
index 48a37b61711..ac88cbb541b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
@@ -21,12 +21,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
- public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class OptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector {
+ public OptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+ super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, executorId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
similarity index 76%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
index 48a37b61711..0684ba9d9de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
@@ -21,12 +21,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
- public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class PreOptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector {
+ public PreOptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+ super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, executorId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
index d4e8e3dfa94..669ecc639b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -193,11 +194,33 @@ public class DumpMetaData {
return DUMP_METADATA;
}
+ public boolean isBootstrapDump() throws SemanticException {
+ initializeIfNot();
+ return (this.dumpType == DumpType.BOOTSTRAP);
+ }
+
public boolean isIncrementalDump() throws SemanticException {
initializeIfNot();
return (this.dumpType == DumpType.INCREMENTAL);
}
+ public boolean isPreOptimizedBootstrapDump() throws SemanticException {
+ initializeIfNot();
+ return (this.dumpType == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
+ }
+
+ public boolean isOptimizedBootstrapDump() throws SemanticException {
+ initializeIfNot();
+ return (this.dumpType == DumpType.OPTIMIZED_BOOTSTRAP);
+ }
+
+ public void setOptimizedBootstrapToDumpMetadataFile() throws SemanticException {
+
+ assert (this.getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
+ this.setDump(DumpType.OPTIMIZED_BOOTSTRAP, -1L, -1L, null, -1L, false);
+ this.write(true);
+ }
+
private void initializeIfNot() throws SemanticException {
if (!initialized) {
loadDumpFromFile();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
similarity index 70%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
index 48a37b61711..8a582772804 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
@@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.ql.parse.repl.dump.metric;
+package org.apache.hadoop.hive.ql.parse.repl.load.metric;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
+ * BootstrapLoadMetricCollector.
+ * Bootstrap Load Metric Collector
*/
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
- public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+public class OptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector {
+ public OptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) {
+ super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
similarity index 72%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
index 48a37b61711..3308550c0a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
@@ -15,18 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.ql.parse.repl.dump.metric;
+package org.apache.hadoop.hive.ql.parse.repl.load.metric;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
- public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
- super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class PreOptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector {
+ public PreOptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) {
+ super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
index 94668f7b13b..51bcd9434b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
@@ -26,7 +26,9 @@ public class Metadata {
*/
public enum ReplicationType {
BOOTSTRAP,
- INCREMENTAL
+ INCREMENTAL,
+ PRE_OPTIMIZED_BOOTSTRAP,
+ OPTIMIZED_BOOTSTRAP
}
private String dbName;
private ReplicationType replicationType;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
index a78fe9945ef..8ac2896ccf6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
@@ -122,7 +124,7 @@ public class TestReplicationMetricCollector {
conf = new HiveConf();
MetricCollector.getInstance().init(conf);
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -137,7 +139,7 @@ public class TestReplicationMetricCollector {
conf = new HiveConf();
MetricCollector.getInstance().init(conf);
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -149,7 +151,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessBootstrapDumpMetrics() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -191,7 +193,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessIncrDumpMetrics() throws Exception {
ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -231,10 +233,93 @@ public class TestReplicationMetricCollector {
Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
}
+ @Test
+ public void testSuccessPreOptimizedBootstrapDumpMetrics() throws Exception {
+ ReplicationMetricCollector preOptimizedBootstrapDumpMetricCollector = new PreOptimizedBootstrapLoadMetricCollector("db",
+ "dummyDir",-1, conf);
+ Map<String, Long> metricMap = new HashMap<>();
+ metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 0);
+ metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 0);
+ preOptimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap);
+ preOptimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 0);
+ List<ReplicationMetric> actualMetrics = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, actualMetrics.size());
+
+ preOptimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, -1, new SnapshotUtils.ReplSnapshotCount(),
+ new ReplStatsTracker(0));
+ preOptimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
+ actualMetrics = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, actualMetrics.size());
+
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, "dummyDir");
+ expectedMetadata.setLastReplId(-1);
+ Progress expectedProgress = new Progress();
+ expectedProgress.setStatus(Status.SUCCESS);
+ Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
+ dumpStage.setEndTime(0);
+ Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 0);
+ expectedTableMetric.setCurrentCount(0);
+ Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0);
+ expectedFuncMetric.setCurrentCount(0);
+ dumpStage.addMetric(expectedTableMetric);
+ dumpStage.addMetric(expectedFuncMetric);
+ expectedProgress.addStage(dumpStage);
+ ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", -1, expectedMetadata);
+ expectedMetric.setProgress(expectedProgress);
+ checkSuccess(actualMetrics.get(0), expectedMetric, "dump",
+ Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+ }
+
+
+
+
+ @Test
+ public void testSuccessOptimizedBootstrapDumpMetrics() throws Exception {
+ ReplicationMetricCollector optimizedBootstrapDumpMetricCollector = new OptimizedBootstrapDumpMetricCollector("db",
+ "dummyDir", conf, 0L);
+ Map<String, Long> metricMap = new HashMap<>();
+ metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
+ metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
+ optimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap);
+ optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1);
+ List<ReplicationMetric> actualMetrics = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, actualMetrics.size());
+
+ optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2);
+ optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1);
+ actualMetrics = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, actualMetrics.size());
+
+ optimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10, new SnapshotUtils.ReplSnapshotCount(),
+ new ReplStatsTracker(0));
+ optimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
+ actualMetrics = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, actualMetrics.size());
+
+ Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, "dummyDir");
+ expectedMetadata.setLastReplId(10);
+ Progress expectedProgress = new Progress();
+ expectedProgress.setStatus(Status.SUCCESS);
+ Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
+ dumpStage.setEndTime(0);
+ Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10);
+ expectedTableMetric.setCurrentCount(3);
+ Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1);
+ expectedFuncMetric.setCurrentCount(1);
+ dumpStage.addMetric(expectedTableMetric);
+ dumpStage.addMetric(expectedFuncMetric);
+ expectedProgress.addStage(dumpStage);
+ ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0,
+ expectedMetadata);
+ expectedMetric.setProgress(expectedProgress);
+ checkSuccess(actualMetrics.get(0), expectedMetric, "dump",
+ Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+ }
+
@Test
public void testFailoverReadyDumpMetrics() throws Exception {
ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
incrDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -378,7 +463,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessStageFailure() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -393,7 +478,7 @@ public class TestReplicationMetricCollector {
@Test
public void testSuccessStageFailedAdmin() throws Exception {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
- "dummyDir", conf);
+ "dummyDir", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index 1b3cfd3ddec..5059f9af472 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -94,7 +94,7 @@ public class TestReplicationMetricSink {
ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector(
"testAcidTablesReplLoadBootstrapIncr_1592205875387",
"hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
- + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+ + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -157,7 +157,7 @@ public class TestReplicationMetricSink {
ReplicationMetricCollector incrementDumpMetricCollector = new IncrementalDumpMetricCollector(
"testAcidTablesReplLoadBootstrapIncr_1592205875387",
"hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
- + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+ + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
incrementDumpMetricCollector.reportStageStart("dump", metricMap);
@@ -217,7 +217,7 @@ public class TestReplicationMetricSink {
String stagingDir = "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
+ "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive/";
ReplicationMetricCollector failoverDumpMetricCollector = new IncrementalDumpMetricCollector(
- "testAcidTablesReplLoadBootstrapIncr_1592205875387", stagingDir, conf);
+ "testAcidTablesReplLoadBootstrapIncr_1592205875387", stagingDir, conf, 0L);
metricMap = new HashMap<String, Long>(){{put(ReplUtils.MetricName.EVENTS.name(), (long) 10);}};
failoverDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -322,7 +322,7 @@ public class TestReplicationMetricSink {
ReplicationMetricCollector incrementDumpMetricCollector =
new IncrementalDumpMetricCollector("testAcidTablesReplLoadBootstrapIncr_1592205875387",
"hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
- + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+ + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
Map<String, Long> metricMap = new HashMap<>();
ReplStatsTracker repl = Mockito.mock(ReplStatsTracker.class);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
index 72d98c230a5..d8859e3b874 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
@@ -87,7 +87,7 @@ public class TestReplicationMetricUpdateOnFailure {
public void testReplDumpFailure() throws Exception {
String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
IncrementalDumpMetricCollector metricCollector =
- new IncrementalDumpMetricCollector(null, TEST_PATH, conf);
+ new IncrementalDumpMetricCollector(null, TEST_PATH, conf, 0L);
ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
@@ -109,7 +109,7 @@ public class TestReplicationMetricUpdateOnFailure {
String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
MetricCollector.getInstance().deinit();
BootstrapDumpMetricCollector metricCollector =
- new BootstrapDumpMetricCollector(null, TEST_PATH, conf);
+ new BootstrapDumpMetricCollector(null, TEST_PATH, conf, 0L);
ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
@@ -128,7 +128,7 @@ public class TestReplicationMetricUpdateOnFailure {
String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
MetricCollector.getInstance().deinit();
IncrementalDumpMetricCollector metricCollector =
- new IncrementalDumpMetricCollector(null, TEST_PATH, conf);
+ new IncrementalDumpMetricCollector(null, TEST_PATH, conf, 0L);
ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index 5c9681d3ec1..8bf7c4962cd 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
POSTHOOK: Input: sys@replication_metrics_orig
#### A masked pattern was here ####
-repl1 0 {"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name": [...]
+repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name": [...]
repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0} H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA= {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"en [...]