You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/03/21 06:53:51 UTC

[hive] branch master updated: HIVE-25814: Add entry in replication_metrics table for skipped replication. (#2907). (Haymant Mangla, reviewed by Ayush Saxena)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 57978ad  HIVE-25814: Add entry in replication_metrics table for skipped replication. (#2907). (Haymant Mangla, reviewed by Ayush Saxena)
57978ad is described below

commit 57978adbfd26ff6cd6b362b90a1c7c39d2a63bf2
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Mon Mar 21 12:23:28 2022 +0530

    HIVE-25814: Add entry in replication_metrics table for skipped replication. (#2907). (Haymant Mangla, reviewed by Ayush Saxena)
---
 .../hive/ql/parse/TestReplicationScenarios.java    | 103 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   7 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  14 +++
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  31 +++++--
 .../repl/metric/ReplicationMetricCollector.java    |   9 +-
 .../hive/ql/parse/repl/metric/event/Status.java    |   3 +-
 .../clientpositive/replication_metrics_ingest.q    |   7 +-
 .../llap/replication_metrics_ingest.q.out          |  30 +++---
 8 files changed, 173 insertions(+), 31 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b91191e..b630f7d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -25,13 +25,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.metadata.StringAppender;
 import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
@@ -4611,6 +4614,99 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testReplicationMetricForSkippedIteration() throws Throwable {
+    String name = testName.getMethodName();
+    String primaryDbName = createDB(name, driver);
+    String replicaDbName = "replicaDb";
+    try {
+      isMetricsEnabledForTests(true);
+      MetricCollector collector = MetricCollector.getInstance();
+      run("create table " + primaryDbName + ".t1 (id int) STORED AS TEXTFILE", driver);
+      run("insert into " + primaryDbName + ".t1 values(1)", driver);
+      run("repl dump " + primaryDbName, driver);
+
+      ReplicationMetric metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SUCCESS);
+
+      run("repl dump " + primaryDbName, driver);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SKIPPED);
+
+      run("repl load " + primaryDbName + " into " + replicaDbName, driverMirror);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SUCCESS);
+
+      run("repl load " + primaryDbName + " into " + replicaDbName, driverMirror);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SKIPPED);
+    } finally {
+      isMetricsEnabledForTests(false);
+    }
+  }
+
+  @Test
+  public void testReplicationMetricForFailedIteration() throws Throwable {
+    String name = testName.getMethodName();
+    String primaryDbName = createDB(name, driver);
+    String replicaDbName = "tgtDb";
+    try {
+      isMetricsEnabledForTests(true);
+      MetricCollector collector = MetricCollector.getInstance();
+      run("create table " + primaryDbName + ".t1 (id int) STORED AS TEXTFILE", driver);
+      run("insert into " + primaryDbName + ".t1 values(1)", driver);
+      Tuple dumpData = replDumpDb(primaryDbName);
+
+      ReplicationMetric metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SUCCESS);
+
+      run("repl load " + primaryDbName + " into " + replicaDbName, driverMirror);
+
+      Path nonRecoverableFile = new Path(new Path(dumpData.dumpLocation), ReplAck.NON_RECOVERABLE_MARKER.toString());
+      FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(hconf);
+      fs.create(nonRecoverableFile);
+
+      verifyFail("REPL DUMP " + primaryDbName, driver);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SKIPPED);
+      assertEquals(metric.getProgress().getStages().get(0).getErrorLogPath(), nonRecoverableFile.toString());
+
+      verifyFail("REPL DUMP " + primaryDbName, driver);
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SKIPPED);
+      assertEquals(metric.getProgress().getStages().get(0).getErrorLogPath(), nonRecoverableFile.toString());
+
+      fs.delete(nonRecoverableFile, true);
+      dumpData = replDumpDb(primaryDbName);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SUCCESS);
+
+      run("ALTER DATABASE " + replicaDbName + " SET DBPROPERTIES('" + ReplConst.REPL_INCOMPATIBLE + "'='true')", driverMirror);
+
+      verifyFail("REPL LOAD " + primaryDbName + " INTO " + replicaDbName, driverMirror);
+
+      nonRecoverableFile = new Path(new Path(dumpData.dumpLocation), ReplAck.NON_RECOVERABLE_MARKER.toString());
+      assertTrue(fs.exists(nonRecoverableFile));
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.FAILED_ADMIN);
+      assertEquals(metric.getProgress().getStages().get(0).getErrorLogPath(), nonRecoverableFile.toString());
+
+      verifyFail("REPL LOAD " + primaryDbName + " INTO " + replicaDbName, driverMirror);
+
+      metric = collector.getMetrics().getLast();
+      assertEquals(metric.getProgress().getStatus(), Status.SKIPPED);
+      assertEquals(metric.getProgress().getStages().get(0).getErrorLogPath(), nonRecoverableFile.toString());
+    } finally {
+      isMetricsEnabledForTests(false);
+    }
+  }
+
+  @Test
   public void testAddPartition() throws Throwable{
     // Get the logger at the root level.
     Logger logger = LogManager.getLogger("hive.ql.metadata.Hive");
@@ -4715,6 +4811,7 @@ public class TestReplicationScenarios {
 
     // Do a bootstrap dump & load
     Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
+    collector.getMetrics();
     ReplLoadWork.setMbeansParamsForTesting(true,true);
 
     // Do some operations at the source side so that the count & metrics can be counted at the load side.
@@ -4742,6 +4839,9 @@ public class TestReplicationScenarios {
       List<Stage> stages = elem.getProgress().getStages();
       assertTrue(stages.size() != 0);
       for (Stage stage : stages) {
+        if (stage.getReplStats() == null) {
+          continue;
+        }
         for (String event : events) {
           assertTrue(stage.getReplStats(), stage.getReplStats().contains(event));
         }
@@ -4785,6 +4885,9 @@ public class TestReplicationScenarios {
       List<Stage> stages = elem.getProgress().getStages();
       assertTrue(stages.size() != 0);
       for (Stage stage : stages) {
+        if (stage.getReplStats() == null) {
+          continue;
+        }
         for (String event : events) {
           assertTrue(stage.getReplStats(), stage.getReplStats().contains(event));
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 90eec0b..339937f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -140,6 +140,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTar
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
 import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
 import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getDFS;
@@ -189,8 +190,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         initiateDataCopyTasks();
       } else {
         Path dumpRoot = ReplUtils.getEncodedDumpRootPath(conf, work.dbNameOrPattern.toLowerCase());
-        if (ReplUtils.failedWithNonRecoverableError(ReplUtils.getLatestDumpPath(dumpRoot, conf), conf)) {
+        Path latestDumpPath = ReplUtils.getLatestDumpPath(dumpRoot, conf);
+        if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
           LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. ");
+          Path nonRecoverableFile = new Path(latestDumpPath, NON_RECOVERABLE_MARKER.toString());
+          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf);
           setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format()));
           return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode();
         }
@@ -291,6 +295,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           } else {
             LOG.info("Previous Dump is not yet loaded. Skipping this iteration.");
           }
+          ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf);
         }
       }
     } catch (RuntimeException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index c84641a..038c629 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -474,4 +474,18 @@ public class ReplUtils {
       DAGTraversal.traverse(tasks, new AddDependencyToLeaves(Collections.singletonList(task)));
     }
   }
+
+  public static void reportStatusInReplicationMetrics(String stageName, Status status, String errorLogPath,
+                                                      HiveConf conf)
+          throws SemanticException {
+    ReplicationMetricCollector metricCollector =
+            new ReplicationMetricCollector(null, null, null, 0, conf) {};
+    metricCollector.reportStageStart(stageName, new HashMap<>());
+    metricCollector.reportStageEnd(stageName, status, errorLogPath);
+  }
+
+  public static boolean isErrorRecoverable(Throwable e) {
+    int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    return errorCode > ErrorMsg.GENERIC_ERROR.getErrorCode();
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 17472c4..ccddaea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import java.io.IOException;
@@ -52,7 +53,6 @@ import java.util.List;
 import java.util.Collections;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
@@ -93,12 +93,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     switch (ast.getToken().getType()) {
       case TOK_REPL_DUMP: {
         LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
-        analyzeReplDump(ast);
+        try {
+          analyzeReplDump(ast);
+        } catch (SemanticException e) {
+          ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e)
+                  ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+          throw e;
+        }
         break;
       }
       case TOK_REPL_LOAD: {
         LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
-        analyzeReplLoad(ast);
+        try {
+          analyzeReplLoad(ast);
+        } catch (SemanticException e) {
+          if (!e.getMessage().equals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg())) {
+            ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", ReplUtils.isErrorRecoverable(e)
+                    ? Status.FAILED_ADMIN : Status.FAILED, null, conf);
+          }
+          throw e;
+        }
         break;
       }
       case TOK_REPL_STATUS: {
@@ -143,7 +157,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private void initReplDump(ASTNode ast) throws HiveException {
     int numChildren = ast.getChildCount();
-    boolean isMetaDataOnly = false;
 
     String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
     LOG.info("Current ReplScope: Set DB Name: {}", dbNameOrPattern);
@@ -160,7 +173,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           for (Map.Entry<String, String> config : replConfigs.entrySet()) {
             conf.set(config.getKey(), config.getValue());
           }
-          isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY);
         }
         break;
       case TOK_REPL_TABLES:
@@ -319,8 +331,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // tells us what is inside that dumpdir.
 
       //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
-      if (ReplUtils.failedWithNonRecoverableError(ReplUtils.getLatestDumpPath(ReplUtils
-        .getEncodedDumpRootPath(conf, sourceDbNameOrPattern.toLowerCase()), conf), conf)) {
+      Path latestDumpPath = ReplUtils.getLatestDumpPath(ReplUtils
+              .getEncodedDumpRootPath(conf, sourceDbNameOrPattern.toLowerCase()), conf);
+      if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) {
+        Path nonRecoverableFile = new Path(latestDumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString());
+        ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED,
+                nonRecoverableFile.toString(), conf);
         throw new Exception(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg());
       }
       if (loadPath != null) {
@@ -342,6 +358,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
               dmd.getDumpExecutionId()), dmd.isReplScopeModified());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
       } else {
+        ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf);
         LOG.warn("Previous Dump Already Loaded");
       }
     } catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 27d9f7b..5e5639b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -50,6 +50,7 @@ public abstract class ReplicationMetricCollector {
   private MetricCollector metricCollector;
   private boolean isEnabled;
   private static boolean enableForTests;
+  private static long scheduledExecutionIdForTests = 0L;
   private HiveConf conf;
 
   public void setMetricsMBean(ObjectName metricsMBean) {
@@ -149,11 +150,13 @@ public abstract class ReplicationMetricCollector {
       }
       stage.setStatus(status);
       stage.setEndTime(getCurrentTimeInMillis());
-      stage.setErrorLogPath(errorLogPath);
+      if (errorLogPath != null) {
+        stage.setErrorLogPath(errorLogPath);
+      }
       progress.addStage(stage);
       replicationMetric.setProgress(progress);
       metricCollector.addMetric(replicationMetric);
-      if (Status.FAILED == status || Status.FAILED_ADMIN == status) {
+      if (Status.FAILED == status || Status.FAILED_ADMIN == status || Status.SKIPPED == status) {
         reportEnd(status);
       }
     }
@@ -214,7 +217,7 @@ public abstract class ReplicationMetricCollector {
   private void checkEnabledForTests(HiveConf conf) {
     if (enableForTests) {
       conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, "pol");
-      conf.setLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 1L);
+      conf.setLong(Constants.SCHEDULED_QUERY_EXECUTIONID, ++scheduledExecutionIdForTests);
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
index 91430f0..70a7f1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
@@ -27,5 +27,6 @@ public enum Status {
   IN_PROGRESS,
   FAILED_ADMIN,
   FAILOVER_IN_PROGRESS,
-  FAILOVER_READY
+  FAILOVER_READY,
+  SKIPPED
 }
diff --git a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
index 35e7796..917bb50 100644
--- a/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
+++ b/ql/src/test/queries/clientpositive/replication_metrics_ingest.q
@@ -39,7 +39,8 @@ alter scheduled query repl2 disabled;
 
 show databases;
 
-select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS, MESSAGE_FORMAT
-from sys.replication_metrics_orig order by dump_execution_id;
+use sys;
 
-select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS from sys.replication_metrics order by dump_execution_id;
+select t1.POLICY_NAME, t1.DUMP_EXECUTION_ID, t1.METADATA, t1.PROGRESS, t2.PROGRESS, t1.MESSAGE_FORMAT
+from replication_metrics_orig as t1 join replication_metrics as t2 where
+t1.scheduled_execution_id=t2.scheduled_execution_id AND t2.progress not like ('%SKIPPED%') order by t1.dump_execution_id;
diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index 8416382..5c9681d 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -72,27 +72,25 @@ destination
 information_schema
 src
 sys
-PREHOOK: query: select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS, MESSAGE_FORMAT
-from sys.replication_metrics_orig order by dump_execution_id
-PREHOOK: type: QUERY
-PREHOOK: Input: sys@replication_metrics_orig
-#### A masked pattern was here ####
-POSTHOOK: query: select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS, MESSAGE_FORMAT
-from sys.replication_metrics_orig order by dump_execution_id
-POSTHOOK: type: QUERY
-POSTHOOK: Input: sys@replication_metrics_orig
-#### A masked pattern was here ####
-repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA	gzip(json-2.0)
-repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=	gzip(json-2.0)
-PREHOOK: query: select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS from sys.replication_metrics order by dump_execution_id
+PREHOOK: query: use sys
+PREHOOK: type: SWITCHDATABASE
+PREHOOK: Input: database:sys
+POSTHOOK: query: use sys
+POSTHOOK: type: SWITCHDATABASE
+POSTHOOK: Input: database:sys
+PREHOOK: query: select t1.POLICY_NAME, t1.DUMP_EXECUTION_ID, t1.METADATA, t1.PROGRESS, t2.PROGRESS, t1.MESSAGE_FORMAT
+from replication_metrics_orig as t1 join replication_metrics as t2 where
+t1.scheduled_execution_id=t2.scheduled_execution_id AND t2.progress not like ('%SKIPPED%') order by t1.dump_execution_id
 PREHOOK: type: QUERY
 PREHOOK: Input: sys@replication_metrics
 PREHOOK: Input: sys@replication_metrics_orig
 #### A masked pattern was here ####
-POSTHOOK: query: select POLICY_NAME, DUMP_EXECUTION_ID, METADATA, PROGRESS from sys.replication_metrics order by dump_execution_id
+POSTHOOK: query: select t1.POLICY_NAME, t1.DUMP_EXECUTION_ID, t1.METADATA, t1.PROGRESS, t2.PROGRESS, t1.MESSAGE_FORMAT
+from replication_metrics_orig as t1 join replication_metrics as t2 where
+t1.scheduled_execution_id=t2.scheduled_execution_id AND t2.progress not like ('%SKIPPED%') order by t1.dump_execution_id
 POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
 POSTHOOK: Input: sys@replication_metrics_orig
 #### A masked pattern was here ####
-repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	{"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]}
-repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	{"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]}
+repl1	0	{"dbName":"src","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA	{"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name": [...]
+repl2	1	{"dbName":"destination","replicationType":"BOOTSTRAP","stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0}	H4sIAAAAAAAAAG2PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=	{"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"en [...]