You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/02/01 04:10:53 UTC

[hive] branch master updated: HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f94bf  HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)
86f94bf is described below

commit 86f94bf1df0db4b5e1c2a46d51d294c882ddb1f3
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Feb 1 09:40:33 2022 +0530

    HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
 .../hive/hcatalog/api/repl/ReplicationUtils.java   |   2 +-
 .../parse/TestReplicationOptimisedBootstrap.java   | 284 +++++++++++++++++++++
 .../hive/ql/parse/TestReplicationScenarios.java    |   6 +-
 .../parse/TestReplicationScenariosAcidTables.java  |   7 -
 .../hive/ql/exec/repl/OptimisedBootstrapUtils.java |  38 ++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   8 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   6 +-
 .../ql/exec/repl/bootstrap/load/LoadDatabase.java  |   9 +-
 .../incremental/IncrementalLoadTasksBuilder.java   |   9 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |   2 +-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |   5 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |  29 ++-
 .../hive/ql/parse/repl/dump/io/DBSerializer.java   |   2 +-
 .../ql/parse/repl/dump/io/FunctionSerializer.java  |   2 +-
 .../ql/parse/repl/dump/io/PartitionSerializer.java |   2 +-
 .../ql/parse/repl/dump/io/TableSerializer.java     |   2 +-
 .../repl/load/message/AlterDatabaseHandler.java    |   3 +-
 .../apache/hadoop/hive/common/repl/ReplConst.java  |   5 +
 .../apache/hadoop/hive/metastore/HMSHandler.java   |  28 ++
 19 files changed, 393 insertions(+), 56 deletions(-)

diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
index 69e3c13..7514b1a 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
@@ -41,7 +41,7 @@ import java.util.Map;
 
 public class ReplicationUtils {
 
-  public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString();
+  public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString();
 
   private ReplicationUtils(){
     // dummy private constructor, since this class is a collection of static utility methods.
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index d5b819d..a63bde6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -21,7 +21,13 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,20 +37,25 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -450,4 +461,277 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
     assertTrue("Table Diff Contains " + tableDiffEntries,
         tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed")));
   }
+
+  @Test
+  public void testTargetEventIdGenerationAfterFirstIncremental() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table table1 (id int)")
+        .run("insert into table table1 values (100)")
+        .run("create  table table1_managed (name string)")
+        .run("insert into table table1_managed values ('ABC')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Get the latest notification from the notification log for the target database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+
+    // Check the tables are there post incremental load.
+    replica.run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("select id from table1")
+        .verifyResult("100")
+        .run("select name from table1_managed")
+        .verifyResult("ABC")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table table2_managed (id string)")
+        .run("insert into table table1_managed values ('SDC')")
+        .run("insert into table table2_managed values ('A'),('B'),('C')");
+
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Do some modifications on the target database.
+    replica.run("use " + replicatedDbName)
+        .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')")
+        .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')");
+
+    // Validate the current replication id on original target has changed now.
+    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId());
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    tuple = replica.dump(replicatedDbName);
+
+    // Check event ack file should get created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Get the target event id.
+    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+        .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1,
+            new DatabaseAndTableFilter(replicatedDbName, null));
+
+    // There should be 4 events, one for alter db, second to remove first incremental pending and then two custom
+    // alter operations.
+    assertEquals(4, nl.getEvents().size());
+  }
+
+  @Test
+  public void testTargetEventIdGeneration() throws Throwable {
+    // Do a a cycle of bootstrap dump & load.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do the first incremental dump.
+    primary.run("use " + primaryDbName)
+        .run("create external table tablei1 (id int)")
+        .run("create external table tablei2 (id int)")
+        .run("create table tablem1 (id int)")
+        .run("create table tablem2 (id int)")
+        .run("insert into table tablei1 values(1),(2),(3),(4)")
+        .run("insert into table tablei2 values(10),(20),(30),(40)")
+        .run("insert into table tablem1 values(5),(10),(15),(20)")
+        .run("insert into table tablem2 values(6),(12),(18),(24)")
+        .dump(primaryDbName, withClause);
+
+    // Do the incremental load, and check everything is intact.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use "+ replicatedDbName)
+        .run("select id from tablei1")
+        .verifyResults(new String[]{"1","2","3","4"})
+        .run("select id from tablei2")
+        .verifyResults(new String[]{"10","20","30","40"})
+        .run("select id from tablem1")
+        .verifyResults(new String[]{"5","10","15","20"})
+        .run("select id from tablem2")
+        .verifyResults(new String[]{"6","12","18","24"});
+
+    // Do some modifications & call for the second cycle of incremental dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table table1 (id int)")
+        .run("insert into table table1 values (25),(35),(82)")
+        .run("create  table table1_managed (name string)")
+        .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')")
+        .run("insert into table tablei1 values(15),(62),(25),(62)")
+        .run("insert into table tablei2 values(10),(22),(11),(22)")
+        .run("insert into table tablem1 values(5),(10),(15),(20)")
+        .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Get the latest notification from the notification log for the target database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+
+    // Check the tables are there post incremental load.
+    replica.run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("select id from table1")
+        .verifyResults(new String[]{"25", "35", "82"})
+        .run("select name from table1_managed")
+        .verifyResults(new String[]{"CAD", "DAS", "MSA"})
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table table2_managed (id string)")
+        .run("insert into table table1_managed values ('AAA'),('BBB')")
+        .run("insert into table table2_managed values ('A1'),('B1'),('C2')");
+
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table table1 (id int)")
+        .run("insert into table table1 values (15),(1),(96)")
+        .run("create  table table1_managed (id string)")
+        .run("insert into table table1_managed values ('SAA'),('PSA')");
+
+    // Do some modifications on the target database.
+    replica.run("use " + replicatedDbName)
+        .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')")
+        .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')");
+
+    // Validate the current replication id on original target has changed now.
+    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId());
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse01");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file should get created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Get the target event id.
+    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+        .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10,
+            new DatabaseAndTableFilter(replicatedDbName, null));
+
+    assertEquals(1, nl.getEvents().size());
+  }
+
+  @Test
+  public void testTargetEventIdWithNotificationsExpired() throws Throwable {
+    // Do a a cycle of bootstrap dump & load.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do the first incremental dump.
+    primary.run("use " + primaryDbName)
+        .run("create external table tablei1 (id int)")
+        .run("create table tablem1 (id int)")
+        .run("insert into table tablei1 values(1),(2),(3),(4)")
+        .run("insert into table tablem1 values(5),(10),(15),(20)")
+        .dump(primaryDbName, withClause);
+
+    // Do the incremental load, and check everything is intact.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use "+ replicatedDbName)
+        .run("select id from tablei1")
+        .verifyResults(new String[]{"1","2","3","4"})
+        .run("select id from tablem1")
+        .verifyResults(new String[]{"5","10","15","20"});
+
+    // Explicitly make the notification logs.
+    // Get the latest notification from the notification log for the target database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+    // Inject a behaviour where some events missing from notification_log table.
+    // This ensures the incremental dump doesn't get all events for replication.
+    InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>
+        eventIdSkipper =
+        new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() {
+
+      @Nullable
+      @Override
+      public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) {
+        if (null != eventIdList) {
+          List<NotificationEvent> eventIds = eventIdList.getEvents();
+          List<NotificationEvent> outEventIds = new ArrayList<>();
+          for (NotificationEvent event : eventIds) {
+            // Skip the last db event.
+            if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
+              injectionPathCalled = true;
+              continue;
+            }
+            outEventIds.add(event);
+          }
+
+          // Return the new list
+          return new NotificationEventResponse(outEventIds);
+        } else {
+          return null;
+        }
+      }
+    };
+
+    try {
+      InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
+
+      // Prepare for reverse replication.
+      DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+      Path newReplDir = new Path(replica.repldDir + "reverse01");
+      replicaFs.mkdirs(newReplDir);
+      withClause = ReplicationTestUtils.includeExternalTableClause(true);
+      withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+      try {
+        replica.dump(replicatedDbName, withClause);
+        fail("Expected the dump to fail since the notification event is missing.");
+      } catch (Exception e) {
+        // Expected due to missing notification log entry.
+      }
+
+      // Check if there is a non-recoverable error or not.
+      Path nonRecoverablePath =
+          TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf);
+      assertTrue(replicaFs.exists(nonRecoverablePath));
+    } finally {
+      InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
+    }
+  }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 61f61a8..b91191e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.StringAppender;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
@@ -132,9 +131,6 @@ import java.util.Base64;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
@@ -4884,7 +4880,7 @@ public class TestReplicationScenarios {
     assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(lastDbReplDumpId));
 
     Table tbl = metaStoreClientMirror.getTable(replDbName, tblName);
-    String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
     assertTrue(Long.parseLong(tblLastReplId) > Long.parseLong(lastDbReplDumpId));
     assertTrue(Long.parseLong(tblLastReplId) <= Long.parseLong(lastReplDumpId));
     return lastReplDumpId;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index b3ddbf5..2612cbf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -934,13 +934,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assert currentEventId == lastEventId + 1;
 
     primary.run("ALTER DATABASE " + primaryDbName +
-            " SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='true')");
-    lastEventId = primary.getCurrentNotificationEventId().getEventId();
-    primary.dumpFailure(primaryDbName);
-    currentEventId = primary.getCurrentNotificationEventId().getEventId();
-    assert lastEventId == currentEventId;
-
-    primary.run("ALTER DATABASE " + primaryDbName +
             " SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='')");
     primary.dump(primaryDbName);
     replica.run("DROP DATABASE " + replicatedDbName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index 85bbbec..8221a51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getTargetLastReplicatedStateFromParameters;
 
 /**
  * Utility class for handling operations regarding optimised bootstrap in case of replication.
@@ -58,7 +59,7 @@ public class OptimisedBootstrapUtils {
 
   /** Separator used to separate entries in the listing. */
   public static final String FILE_ENTRY_SEPARATOR = "#";
-  private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+  private static final Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
 
   /** table diff directory when in progress */
   public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
@@ -87,20 +88,20 @@ public class OptimisedBootstrapUtils {
   }
 
   /**
-   * Gets the event id from the event ack file
+   * Gets the source & target event id  from the event ack file
    * @param dumpPath the dump path
    * @param conf the hive configuration
    * @return the event id from file.
    * @throws IOException
    */
-  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+  public static String[] getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
     String lastEventId;
     Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
     FileSystem fs = eventAckFilePath.getFileSystem(conf);
     try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
       lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
     }
-    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+    return lastEventId.replaceAll(System.lineSeparator(), "").trim().split(FILE_ENTRY_SEPARATOR);
   }
 
   /**
@@ -176,19 +177,20 @@ public class OptimisedBootstrapUtils {
    * @param dmd the dump metadata
    * @param cmRoot the cmRoot
    * @param dbEventId the database event id to which we have to write in the file.
-   * @param conf the hive configuraiton
+   * @param targetDbEventId the database event id with respect to target cluster.
+   * @param conf the hive configuration
    * @param work the repldump work
    * @return the lastReplId denoting a fake dump(-1) always
    * @throws SemanticException
    */
   public static Long createAndGetEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId,
-      HiveConf conf, ReplDumpWork work)
-      throws SemanticException {
+      String targetDbEventId, HiveConf conf, ReplDumpWork work) throws Exception {
     // Keep an invalid value for lastReplId, to denote it isn't a actual dump.
     Long lastReplId = -1L;
     Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE);
-    Utils.writeOutput(dbEventId, filePath, conf);
-    LOG.info("Created event_ack file at {} with eventId {}", filePath, dbEventId);
+    Utils.writeOutput(dbEventId + FILE_ENTRY_SEPARATOR + targetDbEventId, filePath, conf);
+    LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId,
+        targetDbEventId);
     work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
     dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
     dmd.write(true);
@@ -245,6 +247,24 @@ public class OptimisedBootstrapUtils {
     LOG.info("Completed renaming table diff progress file to table diff complete file.");
   }
 
+  /**
+   * Fetches the notification id from the database with respect to target database.
+   * @param dbName name of database
+   * @param hiveDb the hive object
+   * @return the corresponding notification event id from target database
+   * @throws Exception
+   */
+  public static String getTargetEventId(String dbName, Hive hiveDb) throws Exception {
+    Database database = hiveDb.getDatabase(dbName);
+    String targetLastEventId = getTargetLastReplicatedStateFromParameters(database.getParameters());
+    List<NotificationEvent> events =
+        hiveDb.getMSC().getNextNotification(Long.parseLong(targetLastEventId) - 1, 1, null).getEvents();
+    if (events == null || events.isEmpty() || events.get(0).getEventId() != Long.parseLong(targetLastEventId)) {
+      throw new IllegalStateException("Notification events are missing in the meta store.");
+    }
+    return targetLastEventId;
+  }
+
   private static ArrayList<String> getListing(String dbName, String tableName, Hive hiveDb, HiveConf conf)
       throws HiveException, IOException {
     ArrayList<String> paths = new ArrayList<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index a9d728d..1fcce45 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -131,6 +131,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkF
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTargetEventId;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
 import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
@@ -229,11 +230,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             isFirstIncrementalPending(work.dbNameOrPattern, getHive());
             // Get the last replicated event id from the database.
             String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
+            // Get the last replicated event id from the database with respect to target.
+            String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
             // Check if the tableDiff directory is present or not.
             boolean isTableDiffDirectoryPresent = checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
             if (createEventMarker) {
               LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
-              lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, conf, work);
+              lastReplId =
+                  createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work);
               finishRemainingTasks();
             } else {
               // We should be here only if TableDiff is Present.
@@ -548,7 +552,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         return true;
       }
       // Event_ACK file is present check if it contains correct value or not.
-      String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf);
+      String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf)[0];
       String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()).trim();
       if (!dbEventId.equalsIgnoreCase(fileEventId)) {
         // In case the database event id changed post table_diff_complete generation, that means both forward &
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3a2dda9..2f249aa7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -598,7 +598,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     Map<String, String> dbProps;
     if (work.isIncrementalLoad()) {
       dbProps = new HashMap<>();
-      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
           work.incrementalLoadTasksBuilder().eventTo().toString());
     } else {
       Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
@@ -699,7 +699,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       }
       boolean isTableDiffPresent =
           checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
-      Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf));
+      Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
       if (!isTableDiffPresent) {
         prepareTableDiffFile(eventId, getHive(), work, conf);
         if (this.childTasks == null) {
@@ -761,7 +761,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       if (StringUtils.isNotBlank(dbName)) {
         String lastEventid = builder.eventTo().toString();
         Map<String, String> mapProp = new HashMap<>();
-        mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+        mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastEventid);
         AlterDatabaseSetPropertiesDesc alterDbDesc =
             new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
                 new ReplicationSpec(lastEventid, lastEventid));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index ba7979d..06264f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 public class LoadDatabase {
@@ -141,8 +140,8 @@ public class LoadDatabase {
   private boolean isDbAlreadyBootstrapped(Database db) {
     Map<String, String> props = db.getParameters();
     return ((props != null)
-            && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString())
-            && !props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty());
+            && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
+            && !props.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()).isEmpty());
   }
 
   private boolean isDbEmpty(String dbName) throws HiveException {
@@ -181,7 +180,9 @@ public class LoadDatabase {
     last repl id is set and we create a AlterDatabaseTask at the end of processing a database.
      */
     Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
-    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
+
+    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString());
 
     parameters.remove(ReplUtils.REPL_IS_CUSTOM_DB_LOC);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 0eabc1c..c9f0da4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.slf4j.Logger;
-import org.stringtemplate.v4.ST;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -177,7 +176,7 @@ public class IncrementalLoadTasksBuilder {
       taskChainTail = updateIncPendTask;
 
       Map<String, String> dbProps = new HashMap<>();
-      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
+      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), String.valueOf(lastReplayedEvent));
       ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, dumpDirectory,
               metricCollector, shouldFailover);
       Task<?> barrierTask = TaskFactory.get(replStateLogWork, conf);
@@ -196,8 +195,8 @@ public class IncrementalLoadTasksBuilder {
   }
 
   private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
-    if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-      String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) {
+      String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
       if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
         log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
                 + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
@@ -242,7 +241,7 @@ public class IncrementalLoadTasksBuilder {
 
   private Task<?> dbUpdateReplStateTask(String dbName, String replState, Task<?> preCursor) {
     HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), replState);
 
     AlterDatabaseSetPropertiesDesc alterDbDesc = new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
         new ReplicationSpec(replState, replState));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 18cdab5..816153e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -841,7 +841,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     } else {
       // verify if table has been the target of replication, and if so, check HiveConf if we're allowed
       // to override. If not, fail.
-      if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+      if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
           && conf.getBoolVar(HiveConf.ConfVars.HIVE_EXIM_RESTRICT_IMPORTS_INTO_REPLICATED_TABLES)){
             throw new SemanticException(ErrorMsg.IMPORT_INTO_STRICT_REPL_TABLE.getMsg(
                 "Table "+table.getTableName()+" has repl.last.id parameter set." ));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 0b9eb57..17472c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -55,7 +55,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
-import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
@@ -445,8 +444,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (database != null) {
         inputs.add(new ReadEntity(database));
         Map<String, String> params = database.getParameters();
-        if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-          return params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+        if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) {
+          return params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
         }
       }
     } catch (HiveException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 37039c4..dd2224c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -18,14 +18,13 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import org.apache.hadoop.hive.common.repl.ReplConst;
-import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY;
+
 /**
  * Statements executed to handle replication have some additional
  * information relevant to the replication subsystem - this class
@@ -59,11 +58,12 @@ public class ReplicationSpec {
   public enum KEY {
     REPL_SCOPE("repl.scope"),
     EVENT_ID("repl.event.id"),
-    CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY),
+    CURR_STATE_ID_SOURCE(ReplConst.REPL_TARGET_TABLE_PROPERTY),
     NOOP("repl.noop"),
     IS_REPLACE("repl.is.replace"),
     VALID_WRITEID_LIST("repl.valid.writeid.list"),
-    VALID_TXN_LIST("repl.valid.txnid.list")
+    VALID_TXN_LIST("repl.valid.txnid.list"),
+    CURR_STATE_ID_TARGET(REPL_TARGET_DATABASE_PROPERTY),
     ;
     private final String keyName;
 
@@ -146,7 +146,7 @@ public class ReplicationSpec {
       }
     }
     this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
-    this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
     this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
     this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
     this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString());
@@ -230,8 +230,15 @@ public class ReplicationSpec {
   }
 
   public static String getLastReplicatedStateFromParameters(Map<String, String> m) {
-    if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){
-      return m.get(KEY.CURR_STATE_ID.toString());
+    if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_SOURCE.toString()))){
+      return m.get(KEY.CURR_STATE_ID_SOURCE.toString());
+    }
+    return null;
+  }
+
+  public static String getTargetLastReplicatedStateFromParameters(Map<String, String> m) {
+    if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_TARGET.toString()))){
+      return m.get(KEY.CURR_STATE_ID_TARGET.toString());
     }
     return null;
   }
@@ -360,7 +367,7 @@ public class ReplicationSpec {
         }
       case EVENT_ID:
         return getReplicationState();
-      case CURR_STATE_ID:
+      case CURR_STATE_ID_SOURCE:
         return getCurrentReplicationState();
       case NOOP:
         return String.valueOf(isNoop());
@@ -388,9 +395,9 @@ public class ReplicationSpec {
 
 
   public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) {
-    String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
     if (lastReplId != null) {
-      destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId);
+      destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastReplId);
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
index 06b8fd5..429b9d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
@@ -39,7 +39,7 @@ public class DBSerializer implements JsonWriter.Serializer {
   public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
       throws SemanticException, IOException {
     dbObject.putToParameters(
-        ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+        ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
         additionalPropertiesProvider.getCurrentReplicationState()
     );
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index c340df7..fc01906 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -94,7 +94,7 @@ public class FunctionSerializer implements JsonWriter.Serializer {
       //This is required otherwise correct work object on repl load wont be created.
       writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.REPL_SCOPE.toString(),
           "all");
-      writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+      writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
           additionalPropertiesProvider.getCurrentReplicationState());
       writer.jsonGenerator
           .writeStringField(FIELD_NAME, serializer.toString(copyObj));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 12c183e..d198612 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -57,7 +57,7 @@ public class PartitionSerializer implements JsonWriter.Serializer {
         if (additionalPropertiesProvider.getReplSpecType()
                 != ReplicationSpec.Type.INCREMENTAL_DUMP) {
           partition.putToParameters(
-                  ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+                  ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
                   additionalPropertiesProvider.getCurrentReplicationState());
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 16c371a..8c52194 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -86,7 +86,7 @@ public class TableSerializer implements JsonWriter.Serializer {
       if (additionalPropertiesProvider.getReplSpecType()
               != ReplicationSpec.Type.INCREMENTAL_DUMP) {
         table.putToParameters(
-                ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+                ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
                 additionalPropertiesProvider.getCurrentReplicationState());
       }
     } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index 65841c3..2194f39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -64,7 +64,8 @@ public class AlterDatabaseHandler extends AbstractMessageHandler {
           String key = entry.getKey();
           // Ignore the keys which are local to source warehouse
           if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
-                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
+                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString())
                   || key.equals(ReplUtils.REPL_CHECKPOINT_KEY)
                   || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)
                   || key.equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG)
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index 46dbc3e..f5131ca 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -53,4 +53,9 @@ public class ReplConst {
   public static final String TARGET_OF_REPLICATION = "repl.target.for";
 
   public static final String REPL_INCOMPATIBLE = "repl.incompatible";
+
+  /**
+   * Tracks the event id with respect to the target cluster.
+   */
+  public static final String REPL_TARGET_DATABASE_PROPERTY = "repl.target.last.id";
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index d186c4f..d8a3da4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -110,6 +110,7 @@ import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFI
 import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
 import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS;
 
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY;
 import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
 import static org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
@@ -1581,6 +1582,16 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
         throw new MetaException("Could not alter database \"" + parsedDbName[DB_NAME] +
             "\". Could not retrieve old definition.");
       }
+
+      // Add replication target event id.
+      if (isReplicationEventIdUpdate(oldDB, newDB)) {
+        Map<String, String> oldParams = new LinkedHashMap<>(newDB.getParameters());
+        String currentNotificationLogID = Long.toString(ms.getCurrentNotificationEventId().getEventId());
+        oldParams.put(REPL_TARGET_DATABASE_PROPERTY, currentNotificationLogID);
+        LOG.debug("Adding the {} property for database {} with event id {}", REPL_TARGET_DATABASE_PROPERTY,
+            newDB.getName(), currentNotificationLogID);
+        newDB.setParameters(oldParams);
+      }
       firePreEvent(new PreAlterDatabaseEvent(oldDB, newDB, this));
 
       ms.openTransaction();
@@ -1613,6 +1624,23 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     }
   }
 
+  /**
+   * Checks whether the repl.last.id is being updated.
+   * @param oldDb the old db object
+   * @param newDb the new db object
+   * @return true if repl.last.id is being changed.
+   */
+  private boolean isReplicationEventIdUpdate(Database oldDb, Database newDb) {
+    Map<String, String> oldDbProp = oldDb.getParameters();
+    Map<String, String> newDbProp = newDb.getParameters();
+    if (newDbProp == null || newDbProp.isEmpty()) {
+      return false;
+    }
+    String newReplId = newDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY);
+    String oldReplId = oldDbProp != null ? oldDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY) : null;
+    return newReplId != null && !newReplId.equalsIgnoreCase(oldReplId);
+  }
+
   private void drop_database_core(RawStore ms, String catName,
                                   final String name, final boolean deleteData, final boolean cascade)
       throws NoSuchObjectException, InvalidOperationException, MetaException,