You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/12/17 04:55:37 UTC

[hive] branch master updated: HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a07c3cc  HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
a07c3cc is described below

commit a07c3ccb1bff733ff41f6c38947f406aebc11353
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Dec 17 10:25:18 2021 +0530

    HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/ql/ErrorMsg.java   |   3 +-
 .../parse/TestReplicationOptimisedBootstrap.java   | 453 +++++++++++++++++++++
 .../parse/TestReplicationScenariosAcidTables.java  |  24 --
 .../TestReplicationScenariosAcrossInstances.java   |   7 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |   4 +-
 .../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 294 +++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  82 +++-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  29 ++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |   4 +
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |   4 +-
 10 files changed, 860 insertions(+), 44 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 6220175..927650f 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -633,7 +633,8 @@ public enum ErrorMsg {
   REPL_DISTCP_SNAPSHOT_EXCEPTION(40015, "SNAPSHOT_ERROR", true),
   RANGER_AUTHORIZATION_FAILED(40016, "Authorization Failure while communicating to Ranger admin", true),
   RANGER_AUTHENTICATION_FAILED(40017, "Authentication Failure while communicating to Ranger admin", true),
-  REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is replication incompatible.", true)
+  REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is replication incompatible.", true),
+  REPL_FAILOVER_TARGET_MODIFIED(40019,"Database event id changed post table diff generation.")
   ;
 
   private int errorCode;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
new file mode 100644
index 0000000..d5b819d
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", dumpPath, conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", dumpPath, conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", dumpPath, conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", dumpPath, conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", dumpPath, conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", dumpPath, conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending : " + replicatedDbName));
+    }
+
+    // Do a incremental cycle and check we don't get this exception.
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Retrigger reverse dump, this time it should be successful and event ack should get created.
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file should get created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+    // Check the event_ack file doesn't exist.
+    assertFalse("event ack file exists despite quota violation", replicaFs.listFiles(newReplDir, true).hasNext());
+
+    // Set the quota to a value that makes sure event ack file gets created and then fails
+    replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 3, QUOTA_RESET);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+
+    // Check the event ack file got created despite exception and failure.
+    assertEquals("event_ack", replicaFs.listFiles(newReplDir, true).next().getPath().getName());
+
+    // Remove quota for a successful dump
+    replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+
+    // Retry Dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file is there.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Set quota again to restrict creation of table diff in middle during load.
+    replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2, QUOTA_RESET);
+
+    try {
+      primary.load(primaryDbName, replicatedDbName, withClause);
+    } catch (Exception e) {
+      // Ignore, expected due to quota violation.
+    }
+
+    // Check table diff in progress directory gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+    // Check table diff complete directory doesn't gets created.
+    assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Set Quota to a value so that table diff complete gets created and we fail post that.
+    replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 1, QUOTA_RESET);
+    try {
+      primary.load(primaryDbName, replicatedDbName, withClause);
+      fail("Expected failure due to quota violation");
+    } catch (Exception e) {
+      // Ignore, expected due to quota violation.
+    }
+
+    // Check table diff complete directory gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Remove the quota and see everything recovers.
+    replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check table diff in complete directory gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check table diff in progress directory isn't there now.
+    assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+    // Check the entries in table diff are correct.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries,
+        tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed")));
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d05ff85..b3ddbf5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -3031,30 +3031,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
   }
 
-  @Test
-  public void testReplTargetOfReplication() throws Throwable {
-    // Bootstrap
-    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName);
-    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
-
-    //Try to do a dump on replicated db. It should fail
-    replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')");
-    try {
-      replica.dump(replicatedDbName);
-    } catch (Exception e) {
-      Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage());
-      Assert.assertEquals(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getErrorCode(),
-        ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
-    }
-    replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')");
-
-    //Try to dump a different db on replica. That should succeed
-    replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')")
-      .dump(replicatedDbName + "_extra");
-    replica.run("drop database if exists " + replicatedDbName + "_extra cascade");
-  }
-
   private void verifyPathExist(FileSystem fs, Path filePath) throws IOException {
     assertTrue("Path not found:" + filePath, fs.exists(filePath));
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 37c3970..d9c63c6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1084,10 +1084,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .verifyResult(tuplePrimary.lastReplicationId)
             .run("show tblproperties t1('custom.property')")
             .verifyResults(new String[] { "custom.property\tcustom.value" })
-            .dumpFailure(replicatedDbName)
             .run("alter database " + replicatedDbName
-                    + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')")
-            .dumpFailure(replicatedDbName);   //can not dump the db before first successful incremental load is done.
+                    + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
 
     // do a empty incremental load to allow dump of replicatedDbName
     WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList());
@@ -1178,8 +1176,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     replica.load(replicatedDbName, primaryDbName);
     assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
 
-    replica.dumpFailure(replicatedDbName);  //can not dump db which is target of replication
-
     replica.run("ALTER DATABASE " + replicatedDbName + " Set DBPROPERTIES('repl.target.for' = '')");
     assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
     replica.dump(replicatedDbName);
@@ -1192,7 +1188,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             replica.getDatabase(replicatedDbName).getParameters());
     assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
 
-    replica.dumpFailure(replicatedDbName);    //Cannot dump database which is target of replication.
   }
 
   @Test
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 1191267..eee5702 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -227,7 +227,7 @@ public class WarehouseInstance implements Closeable {
       driver.getResults(lastResults);
     }
     // Split around the 'tab' character
-    return (lastResults.get(0).split("\\t"))[colNum];
+    return !lastResults.isEmpty() ? (lastResults.get(0).split("\\t"))[colNum] : "";
   }
 
   public WarehouseInstance run(String command) throws Throwable {
@@ -388,7 +388,7 @@ public class WarehouseInstance implements Closeable {
     List<String> lowerCaseData =
         Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList());
     assertEquals(data.length, filteredResults.size());
-    assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all expected" + StringUtils
+    assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all expected " + StringUtils
             .join(lowerCaseData, ","), filteredResults.containsAll(lowerCaseData));
     return this;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
new file mode 100644
index 0000000..85bbbec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+/**
+ * Utility class for handling operations regarding optimised bootstrap in case of replication.
+ */
+public class OptimisedBootstrapUtils {
+
+  /** Separator used to separate entries in the listing. */
+  public static final String FILE_ENTRY_SEPARATOR = "#";
+  private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+  /** table diff directory when in progress */
+  public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+  /** table diff directory when complete */
+  public static final String TABLE_DIFF_COMPLETE_DIRECTORY = "table_diff_complete";
+
+  /** event ack file which contains the event id till which the cluster was last loaded. */
+  public static final String EVENT_ACK_FILE = "event_ack";
+
+  /**
+   * Gets & checks whether the database is target of replication.
+   * @param dbName name of database
+   * @param hive hive object
+   * @return true, if the database has repl.target.for property set.
+   * @throws HiveException
+   */
+  public static boolean isFailover(String dbName, Hive hive) throws HiveException {
+    Database database = hive.getDatabase(dbName);
+    return database != null ? MetaStoreUtils.isTargetOfReplication(database) : false;
+  }
+
+  public static boolean checkFileExists(Path dumpPath, HiveConf conf, String fileName) throws IOException {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    return fs.exists(new Path(dumpPath, fileName));
+  }
+
+  /**
+   * Gets the event id from the event ack file
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return the event id from file.
+   * @throws IOException
+   */
+  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+    String lastEventId;
+    Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+    FileSystem fs = eventAckFilePath.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+      lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+  }
+
+  /**
+   * Gets the name of tables in the table diff file.
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return Set with list of tables
+   * @throws Exception
+   */
+  public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, HiveConf conf) throws Exception {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    FileStatus[] list = fs.listStatus(tableDiffPath);
+    HashSet<String> tables = new HashSet<>();
+    for (FileStatus fStatus : list) {
+      tables.add(fStatus.getPath().getName());
+    }
+    return tables;
+  }
+
+  /**
+   * Extracts the recursive listing from the table file.
+   * @param file the name of table
+   * @param dumpPath the dump path
+   * @param conf the hive conf
+   * @return the list of paths in the table.
+   * @throws IOException
+   */
+  public static HashSet<String> getPathsFromTableFile(String file, Path dumpPath, HiveConf conf) throws IOException {
+    HashSet<String> paths = new HashSet<>();
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    Path filePath = new Path(tableDiffPath, file);
+    String allEntries;
+    try (FSDataInputStream stream = fs.open(filePath);) {
+      allEntries = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    paths.addAll(Arrays.asList(allEntries.split(System.lineSeparator())).stream().filter(item -> !item.isEmpty())
+        .collect(Collectors.toSet()));
+    return paths;
+  }
+
+  /**
+   * Gets the event id stored in database denoting the last loaded event id.
+   * @param dbName the name of database
+   * @param hiveDb the hive object
+   * @return event id from the database
+   * @throws HiveException
+   */
+  public static String getReplEventIdFromDatabase(String dbName, Hive hiveDb) throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    String currentLastEventId = getLastReplicatedStateFromParameters(database.getParameters());
+    return currentLastEventId;
+  }
+
+  /**
+   * Validates if the first incremental is done before starting optimised bootstrap
+   * @param dbName name of database
+   * @param hiveDb the hive object
+   * @throws HiveException
+   */
+  public static void isFirstIncrementalPending(String dbName, Hive hiveDb) throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database == null || ReplUtils.isFirstIncPending(database.getParameters()))
+      throw new HiveException(
+          "Replication dump not allowed for replicated database with first incremental dump pending : " + dbName);
+  }
+
+  /**
+   * Creates the event ack file and sets the dump metadata post that marking completion of dump flow for first round
+   * of optimised failover dump.
+   * @param currentDumpPath the dump path
+   * @param dmd the dump metadata
+   * @param cmRoot the cmRoot
+   * @param dbEventId the database event id to which we have to write in the file.
+   * @param conf the hive configuraiton
+   * @param work the repldump work
+   * @return the lastReplId denoting a fake dump(-1) always
+   * @throws SemanticException
+   */
+  public static Long createAndGetEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId,
+      HiveConf conf, ReplDumpWork work)
+      throws SemanticException {
+    // Keep an invalid value for lastReplId, to denote it isn't a actual dump.
+    Long lastReplId = -1L;
+    Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE);
+    Utils.writeOutput(dbEventId, filePath, conf);
+    LOG.info("Created event_ack file at {} with eventId {}", filePath, dbEventId);
+    work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
+    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
+    dmd.write(true);
+    return lastReplId;
+  }
+
+  /**
+   * Prepares the table diff file, with tables modified post the specified event id.
+   * @param eventId the event id after which tables should be modified
+   * @param hiveDb the hive object
+   * @param work the load work
+   * @param conf hive configuration
+   * @throws Exception
+   */
+  public static void prepareTableDiffFile(Long eventId, Hive hiveDb, ReplLoadWork work, HiveConf conf)
+      throws Exception {
+    // Get the notification events.
+    List<NotificationEvent> notificationEvents =
+        hiveDb.getMSC().getNextNotification(eventId - 1, -1, new DatabaseAndTableFilter(work.dbNameToLoadIn, null))
+            .getEvents();
+
+    // Check the first eventId fetched is the same as what we fed, to ensure the events post that hasn't expired.
+    if (notificationEvents.get(0).getEventId() != eventId) {
+      throw new Exception("Failover notification events expired.");
+    }
+    // Remove the first one, it is already loaded, we fetched it to confirm the notification events post that haven't
+    // expired.
+    notificationEvents.remove(0);
+    HashSet<String> modifiedTables = new HashSet<>();
+    for (NotificationEvent event : notificationEvents) {
+      String tableName = event.getTableName();
+      if (tableName != null) {
+        LOG.debug("Added table {} because of eventId {} and eventType {}", event.getTableName(), event.getEventId(),
+            event.getEventType());
+        modifiedTables.add(event.getTableName());
+      }
+    }
+    Path dumpPath = new Path(work.dumpDirectory).getParent();
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path diffFilePath = new Path(dumpPath, TABLE_DIFF_INPROGRESS_DIRECTORY);
+    fs.mkdirs(diffFilePath);
+    for (String table : modifiedTables) {
+      String tables = "";
+      LOG.info("Added table {} to table diff", table);
+      ArrayList<String> pathList = getListing(work.dbNameToLoadIn, table, hiveDb, conf);
+      for (String path : pathList) {
+        tables += path + System.lineSeparator();
+      }
+      Utils.writeOutput(tables, new Path(diffFilePath, table), conf);
+    }
+    LOG.info("Completed writing table diff progress file at {} with entries {}", dumpPath, modifiedTables);
+    // The operation is complete, we can rename to TABLE_DIFF_COMPLETE
+    fs.rename(diffFilePath, new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY));
+    LOG.info("Completed renaming table diff progress file to table diff complete file.");
+  }
+
+  private static ArrayList<String> getListing(String dbName, String tableName, Hive hiveDb, HiveConf conf)
+      throws HiveException, IOException {
+    ArrayList<String> paths = new ArrayList<>();
+    Table table = hiveDb.getTable(dbName, tableName, false);
+    if (table == null) {
+      LOG.info("Table {} not found, excluding the dropped table", tableName);
+      // If the table is not there, return an empty list of paths, we would need to copy the entire stuff.
+      return new ArrayList<>();
+    }
+    Path tableLocation = new Path(table.getSd().getLocation());
+    paths.add(table.getSd().getLocation());
+    FileSystem tableFs = tableLocation.getFileSystem(conf);
+    buildListingForDirectory(paths, tableLocation, tableFs);
+    // Check if the table is partitioned, in case the table is partitioned we need to check for the partitions
+    // listing as well.
+    if (table.isPartitioned()) {
+      List<Partition> partitions = hiveDb.getPartitions(table);
+      for (Partition part : partitions) {
+        Path partPath = part.getDataLocation();
+        // Build listing for the partition only if it doesn't lies within the table location, else it would have been
+        // already included as part of recursive listing of table directory.
+        if (!FileUtils.isPathWithinSubtree(partPath, tableLocation)) {
+          buildListingForDirectory(paths, partPath, tableFs);
+        }
+      }
+    }
+    return paths;
+  }
+
+  private static void buildListingForDirectory(ArrayList<String> listing, Path tableLocation, FileSystem tableFs)
+      throws IOException {
+    if (!tableFs.exists(tableLocation)) {
+      return;
+    }
+    RemoteIterator<FileStatus> itr = tableFs.listStatusIterator(tableLocation);
+    while (itr.hasNext()) {
+      FileStatus fstatus = itr.next();
+      if (fstatus.isDirectory()) {
+        listing.add(fstatus.getPath().toString());
+        buildListingForDirectory(listing, fstatus.getPath(), tableFs);
+      } else {
+        listing.add(fstatus.getPath() + FILE_ENTRY_SEPARATOR + fstatus.getLen() + FILE_ENTRY_SEPARATOR + tableFs
+            .getFileChecksum(fstatus.getPath()));
+      }
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index de18fe6..a9d728d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -125,6 +125,14 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHO
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
 import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
@@ -139,6 +147,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final long SLEEP_TIME_FOR_TESTS = 30000;
   private Set<String> tablesForBootstrap = new HashSet<>();
   private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED);
+  private boolean createEventMarker = false;
 
   public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
     private final String name;
@@ -178,19 +187,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         }
         Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
         boolean isFailoverMarkerPresent = false;
-        if (previousValidHiveDumpPath == null) {
+        boolean isFailover = isFailover(work.dbNameOrPattern, getHive());
+        LOG.debug("Database is {} going through failover", isFailover ? "" : "not");
+        if (previousValidHiveDumpPath == null && !isFailover) {
           work.setBootstrap(true);
         } else {
-          work.setOldReplScope(new DumpMetaData(previousValidHiveDumpPath, conf).getReplScope());
-          isFailoverMarkerPresent = isDumpFailoverReady(previousValidHiveDumpPath);
+          work.setOldReplScope(isFailover ? null : new DumpMetaData(previousValidHiveDumpPath, conf).getReplScope());
+          isFailoverMarkerPresent = !isFailover && isDumpFailoverReady(previousValidHiveDumpPath);
         }
         //Proceed with dump operation in following cases:
         //1. No previous dump is present.
         //2. Previous dump is already loaded and it is not in failover ready status.
-        if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent)) {
+        if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent, isFailover)) {
           Path currentDumpPath = getCurrentDumpPath(dumpRoot, work.isBootstrap());
           Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
-          if (!work.isBootstrap()) {
+          if (!work.isBootstrap() && !isFailover) {
             preProcessFailoverIfRequired(previousValidHiveDumpPath, isFailoverMarkerPresent);
           }
           // Set distCp custom name corresponding to the replication policy.
@@ -211,14 +222,37 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
           Long lastReplId;
           LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET));
-          if (work.isBootstrap()) {
+          if (isFailover) {
+            LOG.info("Optimised Bootstrap Dump triggered for {}.", work.dbNameOrPattern);
+            // Before starting optimised bootstrap, check if the first incremental is done to ensure database is in
+            // consistent state.
+            isFirstIncrementalPending(work.dbNameOrPattern, getHive());
+            // Get the last replicated event id from the database.
+            String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
+            // Check if the tableDiff directory is present or not.
+            boolean isTableDiffDirectoryPresent = checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+            if (createEventMarker) {
+              LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
+              lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, conf, work);
+              finishRemainingTasks();
+            } else {
+              // We should be here only if TableDiff is Present.
+              assert isTableDiffDirectoryPresent;
+              // TODO: Dump using TableDiff file & get lastReplId
+              lastReplId = -1L;
+            }
+          }
+          else if (work.isBootstrap()) {
             lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive());
           } else {
             work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath));
             lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
           }
-          work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
-          initiateDataCopyTasks();
+          // The datacopy doesn't need to be initialised in case of optimised bootstrap first dump.
+          if (lastReplId >= 0) {
+            work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
+            initiateDataCopyTasks();
+          }
         } else {
           if (isFailoverMarkerPresent) {
             LOG.info("Previous Dump is failover ready. Skipping this iteration.");
@@ -487,16 +521,46 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return false;
   }
 
-  private boolean shouldDump(Path previousDumpPath, boolean isFailoverMarkerPresent) throws IOException {
+  private boolean shouldDump(Path previousDumpPath, boolean isFailoverMarkerPresent, boolean isFailover)
+      throws IOException, HiveException {
     /** a) If there is no previous dump dir found, the current run is bootstrap case.
      * b) If the previous dump was successful and it contains failover marker file as well as
      * HiveConf.ConfVars.HIVE_REPL_FAILOVER_START == true, last dump was a controlled failover dump,
      * skip doing any further dump.
      */
     if (previousDumpPath == null) {
+      createEventMarker = isFailover;
       return true;
     } else if (isFailoverMarkerPresent && shouldFailover()) {
       return false;
+    } else if (isFailover) {
+      // In case of OptimisedBootstrap Failover, We need to do a dump in case:
+      // 1. No EVENT_ACK file is there.
+      // 2. EVENT_ACK file and TABLE_DIFF_COMPLETE file is also there and the current database id is same as that in
+      // the EVENT_ACK file
+      boolean isEventAckFilePresent = checkFileExists(previousDumpPath.getParent(), conf, EVENT_ACK_FILE);
+      if (!isEventAckFilePresent) {
+        // If in the previous valid dump path, Event_Ack isn't there that means the previous one was a normal dump,
+        // we need to trigger the failover dump
+        LOG.debug("EVENT_ACK file not found in {}. Proceeding with OptimisedBootstrap Failover",
+            previousDumpPath.getParent());
+        createEventMarker = true;
+        return true;
+      }
+      // Event_ACK file is present check if it contains correct value or not.
+      String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf);
+      String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()).trim();
+      if (!dbEventId.equalsIgnoreCase(fileEventId)) {
+        // In case the database event id changed post table_diff_complete generation, that means both forward &
+        // backward policies are operational, We fail in that case with non-recoverable error.
+        LOG.error("The database eventID {} and the event id in the EVENT_ACK file {} both mismatch. FilePath {}",
+            dbEventId, fileEventId, previousDumpPath.getParent());
+        throw new RuntimeException("Database event id changed post table diff generation.");
+      } else {
+        // Check table_diff_complete and Load_ACK
+        return checkFileExists(previousDumpPath.getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY) && checkFileExists(previousDumpPath,
+            conf, LOAD_ACKNOWLEDGEMENT.toString());
+      }
     } else {
       FileSystem fs = previousDumpPath.getFileSystem(conf);
       return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index bac747f..3a2dda9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
 
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
@@ -88,6 +89,10 @@ import java.util.LinkedList;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.prepareTableDiffFile;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_METADATA;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.getExternalTableBaseDir;
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
@@ -680,6 +685,30 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     }
     Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
     Map<String, String> props = new HashMap<>();
+
+    // Check if it is a optimised bootstrap failover.
+    if (work.isFailover) {
+      // Check it should be marked as target of replication & not source of replication.
+      if (MetaStoreUtils.isTargetOfReplication(targetDb)) {
+        LOG.error("The database {} is already marked as target for replication", targetDb.getName());
+        throw new Exception("Failover target is already marked as target");
+      }
+      if (!ReplChangeManager.isSourceOfReplication(targetDb)) {
+        LOG.error("The database {} is already source of replication.", targetDb.getName());
+        throw new Exception("Failover target was not source of replication");
+      }
+      boolean isTableDiffPresent =
+          checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+      Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf));
+      if (!isTableDiffPresent) {
+        prepareTableDiffFile(eventId, getHive(), work, conf);
+        if (this.childTasks == null) {
+          this.childTasks = new ArrayList<>();
+        }
+        createReplLoadCompleteAckTask();
+        return 0;
+      }
+    }
     if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
       props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 7c51df5..cf38651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
 import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_STATS_TOP_EVENTS_COUNTS;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
 
 @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
     Explain.Level.DEFAULT,
@@ -86,6 +87,7 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
   private String scheduledQueryName;
   private String executionId;
   private boolean shouldFailover;
+  public boolean isFailover;
 
   /*
   these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -134,6 +136,8 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
       FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf);
       shouldFailover = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)
               && fs.exists(failoverReadyMarker);
+      isFailover =
+          checkFileExists(new Path(dumpDirectory).getParent(), hiveConf, OptimisedBootstrapUtils.EVENT_ACK_FILE);
       incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
           new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector,
           replStatsTracker, shouldFailover);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index dd611c0..0b9eb57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -183,8 +183,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                     "{} is set to TARGET.", dbName, ReplConst.REPL_FAILOVER_ENDPOINT);
             ReplUtils.unsetDbPropIfSet(database, ReplConst.TARGET_OF_REPLICATION, db);
           } else {
-            LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a target of replication (repl.target.for)");
-            throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg());
+            LOG.warn("Database " + dbNameOrPattern + " is marked as target of replication (repl.target.for), Will "
+                + "trigger failover.");
           }
         }
       } else {