You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/31 04:53:56 UTC

[hive] branch master updated: HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)

This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a5558d71c4 HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)
0a5558d71c4 is described below

commit 0a5558d71c474f285c1e6d338a4be93bd1c6ce13
Author: harshal-16 <10...@users.noreply.github.com>
AuthorDate: Tue Jan 31 10:23:45 2023 +0530

    HIVE-26606: Expose failover states in replication metrics (Harshal Patel, reviewed by Teddy Choi)
---
 .../parse/TestReplicationScenariosAcidTables.java  |  8 +-
 .../hive/ql/exec/repl/OptimisedBootstrapUtils.java |  2 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 40 ++++++---
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  5 ++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   | 10 ++-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 40 +++++----
 .../apache/hadoop/hive/ql/parse/repl/DumpType.java | 12 +++
 .../dump/metric/BootstrapDumpMetricCollector.java  |  4 +-
 .../metric/IncrementalDumpMetricCollector.java     |  4 +-
 ... => OptimizedBootstrapDumpMetricCollector.java} | 11 +--
 ... PreOptimizedBootstrapDumpMetricCollector.java} | 11 +--
 .../hive/ql/parse/repl/load/DumpMetaData.java      | 23 +++++
 .../OptimizedBootstrapLoadMetricCollector.java}    | 12 +--
 .../PreOptimizedBootstrapLoadMetricCollector.java} | 13 ++-
 .../hive/ql/parse/repl/metric/event/Metadata.java  |  4 +-
 .../metric/TestReplicationMetricCollector.java     | 99 ++++++++++++++++++++--
 .../repl/metric/TestReplicationMetricSink.java     |  8 +-
 .../TestReplicationMetricUpdateOnFailure.java      |  6 +-
 .../llap/replication_metrics_ingest.q.out          |  2 +-
 19 files changed, 232 insertions(+), 82 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d09120bcc8c..e23e9d4bb00 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -397,7 +397,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
     dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
     assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
@@ -625,7 +625,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
     assertTrue(fs.exists(dumpAckFile));
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
     assertTrue(MetaStoreUtils.isTargetOfReplication(db));
@@ -734,7 +734,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
     assertTrue(fs.exists(dumpAckFile));
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
     assertTrue(MetaStoreUtils.isTargetOfReplication(db));
@@ -748,7 +748,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(fs.exists(new Path(preFailoverDumpData.dumpLocation)));
     assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
     assertTrue(fs.exists(dumpAckFile));
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index 7074226e14f..9ff0d244bbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -260,7 +260,7 @@ public class OptimisedBootstrapUtils {
     LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId,
         targetDbEventId);
     work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
-    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
+    dmd.setDump(DumpType.PRE_OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, -1L, false);
     dmd.write(true);
     return lastReplId;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 8f2d94d5a6c..aa02b05f618 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -88,9 +88,12 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.PreOptimizedBootstrapDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -199,7 +202,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
           LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. ");
           Path nonRecoverableFile = new Path(latestDumpPath, NON_RECOVERABLE_MARKER.toString());
-          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf);
+          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf,  work.dbNameOrPattern, null);
           setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format()));
           return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode();
         }
@@ -226,7 +229,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           String mapRedCustomName = ReplUtils.getDistCpCustomName(conf, work.dbNameOrPattern);
           conf.set(JobContext.JOB_NAME, mapRedCustomName);
           work.setCurrentDumpPath(currentDumpPath);
-          work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot));
+          work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot, isFailover));
           if (shouldDumpAtlasMetadata()) {
             addAtlasDumpTask(work.isBootstrap(), previousValidHiveDumpPath);
             LOG.info("Added task to dump atlas metadata.");
@@ -252,9 +255,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
 
               LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
+              Map<String, Long> metricMap = new HashMap<>();
+              metricMap.put(ReplUtils.MetricName.EVENTS.name(), 0L);
+              work.getMetricCollector().reportStageStart(getName(), metricMap);
               lastReplId =
                   createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work);
               finishRemainingTasks();
+              work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS);
             } else {
               // We should be here only if TableDiff is Present.
               boolean isTableDiffDirectoryPresent =
@@ -265,7 +272,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               assert isTableDiffDirectoryPresent;
 
               work.setSecondDumpAfterFailover(true);
-
+              long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+              work.setMetricCollector(new OptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId));
               long fromEventId = Long.parseLong(getEventIdFromFile(previousValidHiveDumpPath.getParent(), conf)[1]);
               LOG.info("Starting optimised bootstrap from event id {} for database {}", fromEventId,
                   work.dbNameOrPattern);
@@ -303,7 +311,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           } else {
             LOG.info("Previous Dump is not yet loaded. Skipping this iteration.");
           }
-          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf);
+          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf,
+                  work.dbNameOrPattern, work.isBootstrap() ? ReplicationType.BOOTSTRAP: ReplicationType.INCREMENTAL);
         }
       }
     } catch (RuntimeException e) {
@@ -914,8 +923,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     } finally {
       //write the dmd always irrespective of success/failure to enable checkpointing in table level replication
       long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
-      dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId,
-        previousReplScopeModified());
+      if (work.isSecondDumpAfterFailover()){
+        dmd.setDump(DumpType.OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, executionId,
+                previousReplScopeModified());
+      }
+      else {
+        dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId,
+                previousReplScopeModified());
+      }
       // If repl policy is changed (oldReplScope is set), then pass the current replication policy,
       // so that REPL LOAD would drop the tables which are not included in current policy.
       dmd.setReplScope(work.replScope);
@@ -1053,12 +1068,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
-  private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) {
+  private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot, boolean isFailover) {
     ReplicationMetricCollector collector;
+    long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
     if (isBootstrap) {
-      collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf);
+      collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
     } else {
-      collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf);
+      if (isFailover) {
+        collector = new PreOptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
+      } else {
+        collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf, executorId);
+      }
     }
     return collector;
   }
@@ -1425,8 +1445,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     Path backingFile = new Path(dumpRoot, fileName);
     return new FileList(backingFile, conf);
   }
-
-
   private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
     try {
       return dumpMetaData.getEventFrom() != null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e2ccd053607..3e7961e9a68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.thrift.TException;
 import com.google.common.collect.Collections2;
 import org.apache.commons.lang3.StringUtils;
@@ -776,6 +778,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         LOG.error("The database {} is already source of replication.", targetDb.getName());
         throw new Exception("Failover target was not source of replication");
       }
+      work.getMetricCollector().reportStageStart(STAGE_NAME, new HashMap<>());
       boolean isTableDiffPresent =
           checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
       boolean isAbortTxnsListPresent =
@@ -800,6 +803,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         this.childTasks = new ArrayList<>();
       }
       createReplLoadCompleteAckTask();
+      work.getMetricCollector().reportStageEnd(STAGE_NAME, Status.SUCCESS);
+      work.getMetricCollector().reportEnd(Status.SUCCESS);
       return 0;
     } else if (work.isSecondFailover) {
       // DROP the tables extra on target, which are not on source cluster.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index aa2acb19406..607804449b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -68,7 +68,9 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricColle
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -533,10 +535,12 @@ public class ReplUtils {
   }
 
   public static void reportStatusInReplicationMetrics(String stageName, Status status, String errorLogPath,
-                                                      HiveConf conf)
+                                                      HiveConf conf, String dbName, Metadata.ReplicationType replicationType)
           throws SemanticException {
-    ReplicationMetricCollector metricCollector =
-            new ReplicationMetricCollector(null, null, null, 0, conf) {};
+    ReplicationMetricCollector metricCollector;
+    metricCollector =
+            new ReplicationMetricCollector(dbName, replicationType, null, 0, conf) {};
+
     metricCollector.reportStageStart(stageName, new HashMap<>());
     metricCollector.reportStageEnd(stageName, status, errorLogPath);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 34406ee05e8..f32c38be282 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.OptimizedBootstrapLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -97,8 +99,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         try {
           analyzeReplDump(ast);
         } catch (SemanticException e) {
-          ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e)
-                  ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+            ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e)
+                    ? Status.FAILED_ADMIN : Status.FAILED, null, conf, null, null);
           throw e;
         }
         break;
@@ -110,7 +112,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         } catch (SemanticException e) {
           if (!e.getMessage().equals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg())) {
             ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", ReplUtils.isErrorRecoverable(e)
-                    ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+                    ? Status.FAILED_ADMIN : Status.FAILED, null, conf,  null, null);
           }
           throw e;
         }
@@ -318,7 +320,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     // looking at each db, and then each table, and then setting up the appropriate
     // import job in its place.
     try {
-      assert(sourceDbNameOrPattern != null);
+      assert (sourceDbNameOrPattern != null);
       Path loadPath = getCurrentLoadPath();
 
       // Now, the dumped path can be one of three things:
@@ -342,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
         Path nonRecoverableFile = new Path(latestDumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString());
         ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED,
-                nonRecoverableFile.toString(), conf);
+                nonRecoverableFile.toString(), conf,  sourceDbNameOrPattern, null);
         throw new Exception(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg());
       }
       if (loadPath != null) {
@@ -350,21 +352,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
         boolean evDump = false;
         // we will decide what hdfs locations needs to be copied over here as well.
-        if (dmd.isIncrementalDump()) {
-          LOG.debug("{} contains an incremental dump", loadPath);
+        if (dmd.isIncrementalDump() || dmd.isOptimizedBootstrapDump() || dmd.isPreOptimizedBootstrapDump()) {
+          LOG.debug("{} contains an incremental / Optimized bootstrap dump", loadPath);
           evDump = true;
         } else {
           LOG.debug("{} contains an bootstrap dump", loadPath);
         }
+
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern,
                 replScope.getDbName(),
                 dmd.getReplScope(),
                 queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(),
-            initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(),
-              dmd.getDumpExecutionId()), dmd.isReplScopeModified());
+                initMetricCollection(loadPath.toString(), replScope.getDbName(), dmd), dmd.isReplScopeModified());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
+        if (dmd.isPreOptimizedBootstrapDump()) {
+          dmd.setOptimizedBootstrapToDumpMetadataFile();
+        }
       } else {
-        ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf);
+        ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf,  sourceDbNameOrPattern, null);
         LOG.warn("Previous Dump Already Loaded");
       }
     } catch (Exception e) {
@@ -373,13 +378,18 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, String dumpDirectory,
-                                                          String dbNameToLoadIn, long dumpExecutionId) {
+  private ReplicationMetricCollector initMetricCollection(String dumpDirectory,
+                                                          String dbNameToLoadIn, DumpMetaData dmd) throws SemanticException {
+
     ReplicationMetricCollector collector;
-    if (isBootstrap) {
-      collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf);
+    if (dmd.isPreOptimizedBootstrapDump()) {
+      collector = new PreOptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
+    } else if (dmd.isOptimizedBootstrapDump()) {
+      collector = new OptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
+    } else if (dmd.isBootstrapDump()) {
+      collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
     } else {
-      collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf);
+      collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf);
     }
     return collector;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 58b6d2fcbb5..4ab7c7dd701 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -200,6 +200,18 @@ public enum DumpType {
       return new DefaultHandler();
     }
   },
+  PRE_OPTIMIZED_BOOTSTRAP("PRE_OPTIMIZED_BOOTSTRAP") {
+    @Override
+    public MessageHandler handler() {
+      return new DefaultHandler();
+    }
+  },
+  OPTIMIZED_BOOTSTRAP("OPTIMIZED_BOOTSTRAP") {
+    @Override
+    public MessageHandler handler() {
+      return new DefaultHandler();
+    }
+  },
   EVENT_CREATE_DATABASE("EVENT_CREATE_DATABASE") {
     @Override
     public MessageHandler handler() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
index 48a37b61711..fb458eeb083 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
  * Bootstrap Dump Metric Collector
  */
 public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
-  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, executorId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
index 2c0eb473f3d..1722181b46c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
  * Incremental Dump Metric Collector
  */
 public class IncrementalDumpMetricCollector extends ReplicationMetricCollector {
-  public IncrementalDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, 0, conf);
+  public IncrementalDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+    super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, executorId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
similarity index 76%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
index 48a37b61711..ac88cbb541b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java
@@ -21,12 +21,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
-  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class OptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector {
+  public OptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+    super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, executorId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
similarity index 76%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
index 48a37b61711..0684ba9d9de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java
@@ -21,12 +21,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
-  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class PreOptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector {
+  public PreOptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf, Long executorId) {
+    super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, executorId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
index d4e8e3dfa94..669ecc639b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -193,11 +194,33 @@ public class DumpMetaData {
     return DUMP_METADATA;
   }
 
+  public boolean isBootstrapDump() throws SemanticException {
+    initializeIfNot();
+    return (this.dumpType == DumpType.BOOTSTRAP);
+  }
+
   public boolean isIncrementalDump() throws SemanticException {
     initializeIfNot();
     return (this.dumpType == DumpType.INCREMENTAL);
   }
 
+  public boolean isPreOptimizedBootstrapDump() throws SemanticException {
+    initializeIfNot();
+    return (this.dumpType == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
+  }
+
+  public boolean isOptimizedBootstrapDump() throws SemanticException {
+    initializeIfNot();
+    return (this.dumpType == DumpType.OPTIMIZED_BOOTSTRAP);
+  }
+
+  public void setOptimizedBootstrapToDumpMetadataFile() throws SemanticException {
+
+    assert (this.getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP);
+    this.setDump(DumpType.OPTIMIZED_BOOTSTRAP, -1L, -1L, null, -1L, false);
+    this.write(true);
+  }
+
   private void initializeIfNot() throws SemanticException {
     if (!initialized) {
       loadDumpFromFile();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
similarity index 70%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
index 48a37b61711..8a582772804 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java
@@ -15,18 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.ql.parse.repl.dump.metric;
+package org.apache.hadoop.hive.ql.parse.repl.load.metric;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 
 /**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
+ * BootstrapLoadMetricCollector.
+ * Bootstrap Load Metric Collector
  */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
-  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+public class OptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector {
+  public OptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) {
+    super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
similarity index 72%
copy from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
copy to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
index 48a37b61711..3308550c0a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java
@@ -15,18 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.ql.parse.repl.dump.metric;
+package org.apache.hadoop.hive.ql.parse.repl.load.metric;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 
-/**
- * BootstrapDumpMetricCollector.
- * Bootstrap Dump Metric Collector
- */
-public class BootstrapDumpMetricCollector extends ReplicationMetricCollector {
-  public BootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) {
-    super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, 0, conf);
+
+public class PreOptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector {
+  public PreOptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) {
+    super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
index 94668f7b13b..51bcd9434b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
@@ -26,7 +26,9 @@ public class Metadata {
    */
   public enum ReplicationType {
     BOOTSTRAP,
-    INCREMENTAL
+    INCREMENTAL,
+    PRE_OPTIMIZED_BOOTSTRAP,
+    OPTIMIZED_BOOTSTRAP
   }
   private String dbName;
   private ReplicationType replicationType;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
index a78fe9945ef..8ac2896ccf6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
@@ -122,7 +124,7 @@ public class TestReplicationMetricCollector {
     conf = new HiveConf();
     MetricCollector.getInstance().init(conf);
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "dummyDir", conf);
+        "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -137,7 +139,7 @@ public class TestReplicationMetricCollector {
     conf = new HiveConf();
     MetricCollector.getInstance().init(conf);
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "dummyDir", conf);
+        "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -149,7 +151,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessBootstrapDumpMetrics() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-        "dummyDir", conf);
+        "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -191,7 +193,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessIncrDumpMetrics() throws Exception {
     ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
-        "dummyDir", conf);
+        "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -231,10 +233,93 @@ public class TestReplicationMetricCollector {
         Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
   }
 
+  @Test
+  public void testSuccessPreOptimizedBootstrapDumpMetrics() throws Exception {
+    ReplicationMetricCollector preOptimizedBootstrapDumpMetricCollector = new PreOptimizedBootstrapLoadMetricCollector("db",
+            "dummyDir",-1, conf);
+    Map<String, Long> metricMap = new HashMap<>();
+    metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 0);
+    metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 0);
+    preOptimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap);
+    preOptimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 0);
+    List<ReplicationMetric> actualMetrics = MetricCollector.getInstance().getMetrics();
+    Assert.assertEquals(1, actualMetrics.size());
+
+    preOptimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, -1, new SnapshotUtils.ReplSnapshotCount(),
+            new ReplStatsTracker(0));
+    preOptimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
+    actualMetrics = MetricCollector.getInstance().getMetrics();
+    Assert.assertEquals(1, actualMetrics.size());
+
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, "dummyDir");
+    expectedMetadata.setLastReplId(-1);
+    Progress expectedProgress = new Progress();
+    expectedProgress.setStatus(Status.SUCCESS);
+    Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
+    dumpStage.setEndTime(0);
+    Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 0);
+    expectedTableMetric.setCurrentCount(0);
+    Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0);
+    expectedFuncMetric.setCurrentCount(0);
+    dumpStage.addMetric(expectedTableMetric);
+    dumpStage.addMetric(expectedFuncMetric);
+    expectedProgress.addStage(dumpStage);
+    ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", -1, expectedMetadata);
+    expectedMetric.setProgress(expectedProgress);
+    checkSuccess(actualMetrics.get(0), expectedMetric, "dump",
+            Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+  }
+
+
+
+
+  @Test
+  public void testSuccessOptimizedBootstrapDumpMetrics() throws Exception {
+    ReplicationMetricCollector optimizedBootstrapDumpMetricCollector = new OptimizedBootstrapDumpMetricCollector("db",
+            "dummyDir", conf, 0L);
+    Map<String, Long> metricMap = new HashMap<>();
+    metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
+    metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
+    optimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap);
+    optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1);
+    List<ReplicationMetric> actualMetrics = MetricCollector.getInstance().getMetrics();
+    Assert.assertEquals(1, actualMetrics.size());
+
+    optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2);
+    optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1);
+    actualMetrics = MetricCollector.getInstance().getMetrics();
+    Assert.assertEquals(1, actualMetrics.size());
+
+    optimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10, new SnapshotUtils.ReplSnapshotCount(),
+            new ReplStatsTracker(0));
+    optimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS);
+    actualMetrics = MetricCollector.getInstance().getMetrics();
+    Assert.assertEquals(1, actualMetrics.size());
+
+    Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, "dummyDir");
+    expectedMetadata.setLastReplId(10);
+    Progress expectedProgress = new Progress();
+    expectedProgress.setStatus(Status.SUCCESS);
+    Stage dumpStage = new Stage("dump", Status.SUCCESS, 0);
+    dumpStage.setEndTime(0);
+    Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10);
+    expectedTableMetric.setCurrentCount(3);
+    Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1);
+    expectedFuncMetric.setCurrentCount(1);
+    dumpStage.addMetric(expectedTableMetric);
+    dumpStage.addMetric(expectedFuncMetric);
+    expectedProgress.addStage(dumpStage);
+    ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0,
+            expectedMetadata);
+    expectedMetric.setProgress(expectedProgress);
+    checkSuccess(actualMetrics.get(0), expectedMetric, "dump",
+            Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name()));
+  }
+
   @Test
   public void testFailoverReadyDumpMetrics() throws Exception {
     ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db",
-            "dummyDir", conf);
+            "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
     incrDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -378,7 +463,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessStageFailure() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-      "dummyDir", conf);
+      "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -393,7 +478,7 @@ public class TestReplicationMetricCollector {
   @Test
   public void testSuccessStageFailedAdmin() throws Exception {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
-      "dummyDir", conf);
+      "dummyDir", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
index 1b3cfd3ddec..5059f9af472 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
@@ -94,7 +94,7 @@ public class TestReplicationMetricSink {
     ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector(
       "testAcidTablesReplLoadBootstrapIncr_1592205875387",
         "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
-          + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+          + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
     metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
@@ -157,7 +157,7 @@ public class TestReplicationMetricSink {
     ReplicationMetricCollector incrementDumpMetricCollector = new IncrementalDumpMetricCollector(
       "testAcidTablesReplLoadBootstrapIncr_1592205875387",
       "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
-        + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+        + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
     metricMap = new HashMap<>();
     metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10);
     incrementDumpMetricCollector.reportStageStart("dump", metricMap);
@@ -217,7 +217,7 @@ public class TestReplicationMetricSink {
     String stagingDir = "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
             + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive/";
     ReplicationMetricCollector failoverDumpMetricCollector = new IncrementalDumpMetricCollector(
-            "testAcidTablesReplLoadBootstrapIncr_1592205875387", stagingDir, conf);
+            "testAcidTablesReplLoadBootstrapIncr_1592205875387", stagingDir, conf, 0L);
     metricMap = new HashMap<String, Long>(){{put(ReplUtils.MetricName.EVENTS.name(), (long) 10);}};
 
     failoverDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
@@ -322,7 +322,7 @@ public class TestReplicationMetricSink {
     ReplicationMetricCollector incrementDumpMetricCollector =
         new IncrementalDumpMetricCollector("testAcidTablesReplLoadBootstrapIncr_1592205875387",
             "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
-                + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf);
+                + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf, 0L);
     Map<String, Long> metricMap = new HashMap<>();
     ReplStatsTracker repl = Mockito.mock(ReplStatsTracker.class);
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
index 72d98c230a5..d8859e3b874 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricUpdateOnFailure.java
@@ -87,7 +87,7 @@ public class TestReplicationMetricUpdateOnFailure {
   public void testReplDumpFailure() throws Exception {
     String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
     IncrementalDumpMetricCollector metricCollector =
-            new IncrementalDumpMetricCollector(null, TEST_PATH, conf);
+            new IncrementalDumpMetricCollector(null, TEST_PATH, conf, 0L);
     ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
     Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
     Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
@@ -109,7 +109,7 @@ public class TestReplicationMetricUpdateOnFailure {
     String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
     MetricCollector.getInstance().deinit();
     BootstrapDumpMetricCollector metricCollector =
-            new BootstrapDumpMetricCollector(null, TEST_PATH, conf);
+            new BootstrapDumpMetricCollector(null, TEST_PATH, conf, 0L);
     ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
     Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
     Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
@@ -128,7 +128,7 @@ public class TestReplicationMetricUpdateOnFailure {
     String dumpDir = TEST_PATH + Path.SEPARATOR + testName.getMethodName();
     MetricCollector.getInstance().deinit();
     IncrementalDumpMetricCollector metricCollector =
-            new IncrementalDumpMetricCollector(null, TEST_PATH, conf);
+            new IncrementalDumpMetricCollector(null, TEST_PATH, conf, 0L);
     ReplDumpWork replDumpWork = Mockito.mock(ReplDumpWork.class);
     Mockito.when(replDumpWork.getCurrentDumpPath()).thenReturn(new Path(dumpDir));
     Mockito.when(replDumpWork.getMetricCollector()).thenReturn(metricCollector);
diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index 5c9681d3ec1..8bf7c4962cd 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
 POSTHOOK: Input: sys@replication_metrics_orig
 #### A masked pattern was here ####
-repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA	{"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name": [...]
+repl1	1	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA	{"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name": [...]
 repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=	{"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"en [...]