You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/12/13 05:55:25 UTC

[GitHub] [hive] ayushtkn commented on a change in pull request #2795: HIVE-25708. Implement creation of table_diff.

ayushtkn commented on a change in pull request #2795:
URL: https://github.com/apache/hive/pull/2795#discussion_r767429781



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {
+
+  public static final String FILE_ENTRY_SEPERATOR = "#";
+  private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+  /** table diff directory when in progress */
+  public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+  /** table diff directory when complete */
+  public static final String TABLE_DIFF_COMPLETE_DIRECTORY = "table_diff_complete";
+
+  /** event ack file which contains the event id till which the cluster was last loaded. */
+  public static final String EVENT_ACK_FILE = "event_ack";
+
+  /**
+   * Gets & checks whether the database is target of replication.
+   * @param dbName name of database
+   * @param hive hive object
+   * @return true, if the database has repl.target.for property set.
+   * @throws HiveException
+   */
+  public static boolean isFailover(String dbName, Hive hive) throws HiveException {
+    Database database = hive.getDatabase(dbName);
+    return database != null ? MetaStoreUtils.isTargetOfReplication(database) : false;
+  }
+
+  public static boolean checkFileExists(Path dumpPath, HiveConf conf, String fileName) throws IOException {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    return fs.exists(new Path(dumpPath, fileName));
+  }
+
+  /**
+   * Gets the event id from the event ack file
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return the event id from file.
+   * @throws IOException
+   */
+  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+    String lastEventId;
+    Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+    FileSystem fs = eventAckFilePath.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+      lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+  }
+
+  /**
+   * Gets the name of tables in the table diff file.
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return Set with list of tables
+   * @throws Exception
+   */
+  public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, HiveConf conf) throws Exception {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    FileStatus[] list = fs.listStatus(tableDiffPath);
+    HashSet<String> tables = new HashSet<>();

Review comment:
       In general it shouldn't happen. But I don't want duplicates either, So , want to design it that way, that it can't have a duplicate.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending "));

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+
+    // Remove the quota, i.e solve the reason for failure and retry dump.
+    replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+
+    // Retry Dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file now gets created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Set quota again to restrict creation of table diff in middle during load.
+    replicaFs.setQuota(newReplDir, replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2, QUOTA_RESET);
+
+    try {
+      primary.load(primaryDbName, replicatedDbName, withClause);
+    } catch (Exception e) {
+      // Ignore, expected due to quota violation.
+    }
+
+    // Check table diff in progress directory gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+    // Check table diff complete directory doesn't gets created.
+    assertFalse(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Fix the quota issue and re-attempt.

Review comment:
       Done

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("insert into table t2_managed partition(country='france') values ('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", "t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table diff should be created empty, without any
+    // error.
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with first incremental dump pending "));
+    }
+  }

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {
+
+  public static final String FILE_ENTRY_SEPERATOR = "#";
+  private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+  /** table diff directory when in progress */
+  public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+  /** table diff directory when complete */
+  public static final String TABLE_DIFF_COMPLETE_DIRECTORY = "table_diff_complete";
+
+  /** event ack file which contains the event id till which the cluster was last loaded. */
+  public static final String EVENT_ACK_FILE = "event_ack";
+
+  /**
+   * Gets & checks whether the database is target of replication.
+   * @param dbName name of database
+   * @param hive hive object
+   * @return true, if the database has repl.target.for property set.
+   * @throws HiveException
+   */
+  public static boolean isFailover(String dbName, Hive hive) throws HiveException {
+    Database database = hive.getDatabase(dbName);
+    return database != null ? MetaStoreUtils.isTargetOfReplication(database) : false;
+  }
+
+  public static boolean checkFileExists(Path dumpPath, HiveConf conf, String fileName) throws IOException {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    return fs.exists(new Path(dumpPath, fileName));
+  }
+
+  /**
+   * Gets the event id from the event ack file
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return the event id from file.
+   * @throws IOException
+   */
+  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+    String lastEventId;
+    Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+    FileSystem fs = eventAckFilePath.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+      lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+  }
+
+  /**
+   * Gets the name of tables in the table diff file.
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return Set with list of tables
+   * @throws Exception
+   */
+  public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, HiveConf conf) throws Exception {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    FileStatus[] list = fs.listStatus(tableDiffPath);
+    HashSet<String> tables = new HashSet<>();
+    for (FileStatus fStatus : list) {
+      tables.add(fStatus.getPath().getName());
+    }
+    return tables;
+  }
+
+  /**
+   * Extracts the recursive listing from the table file.
+   * @param file the name of table
+   * @param dumpPath the dump path
+   * @param conf the hive conf
+   * @return the list of paths in the table.
+   * @throws IOException
+   */
+  public static HashSet<String> getPathsFromTableFile(String file, Path dumpPath, HiveConf conf) throws IOException {
+    HashSet<String> paths = new HashSet<>();
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    Path filePath = new Path(tableDiffPath, file);
+    String allEntries;
+    try (FSDataInputStream stream = fs.open(filePath);) {
+      allEntries = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    paths.addAll(Arrays.asList(allEntries.split(System.lineSeparator())).stream().filter(item -> !item.isEmpty())
+        .collect(Collectors.toSet()));
+    return paths;
+  }
+
+  /**
+   * Gets the event id stored in database denoting the last loaded event id.
+   * @param dbName the name of database
+   * @param hiveDb the hive object
+   * @return event id from the database
+   * @throws HiveException
+   */
+  public static String getReplEventIdFromDatabase(String dbName, Hive hiveDb) throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    String currentLastEventId = getLastReplicatedStateFromParameters(database.getParameters());
+    return currentLastEventId;
+  }
+
+  /**
+   * Validates if the first incremental is done before starting optimised bootstrap
+   * @param dbName name of database
+   * @param hiveDb the hive object
+   * @throws HiveException
+   */
+  public static void isFirstIncrementalPending(String dbName, Hive hiveDb) throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database == null || ReplUtils.isFirstIncPending(database.getParameters()))
+      throw new HiveException(
+          "Replication dump not allowed for replicated database with first incremental dump pending : " + dbName);
+  }
+
+  /**
+   * Creates the event ack file and sets the dump metadata post that marking completion of dump flow for first round
+   * of optimised failover dump.
+   * @param currentDumpPath the dump path
+   * @param dmd the dump metadata
+   * @param cmRoot the cmRoot
+   * @param dbEventId the database event id to which we have to write in the file.
+   * @param conf the hive configuraiton
+   * @param work the repldump work
+   * @return the lastReplId denoting a fake dump(-1) always
+   * @throws SemanticException
+   */
+  public static Long getAndCreateEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId,

Review comment:
       Yeps, Changed!!!

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country string)")
+        .run("insert into table t2_managed partition(country='india') values ('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);

Review comment:
       The dump directory will be different, so that policy won't be bothered by that. We in general don't allow that. The scenario is same as accidental dump & load on wrong sites.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -680,6 +685,32 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception {
     }
     Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
     Map<String, String> props = new HashMap<>();
+
+    // Check if it is a optimise bootstrap failover.

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -680,6 +685,32 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception {
     }
     Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
     Map<String, String> props = new HashMap<>();
+
+    // Check if it is a optimise bootstrap failover.
+    if (work.isFailover) {
+      // Check it should be marked as target of replication & not source of replication.
+      if (MetaStoreUtils.isTargetOfReplication(targetDb)) {
+        LOG.error("The database {} is already marked as target for replication", targetDb.getName());
+        throw new Exception("Failover target is already marked as target");
+      }
+      if (!ReplChangeManager.isSourceOfReplication(targetDb)) {
+        LOG.error("The database {} is already source of replication.", targetDb.getName());
+        throw new Exception("Failover target was not source of replication");
+      }
+      boolean isTableDiffPresent =
+          checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+      Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf));
+      if (!isTableDiffPresent) {
+        prepareTableDiffFile(eventId, getHive(), work, conf);
+        if (this.childTasks == null) {
+          this.childTasks = new ArrayList<>();
+        }
+        createReplLoadCompleteAckTask();
+        return 0;
+      } else {
+        // TODO : Load using table_diff

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -385,7 +419,7 @@ private void finishRemainingTasks() throws SemanticException {
     Utils.create(dumpAckFile, conf);
     prepareReturnValues(work.getResultValues());
     work.getMetricCollector().reportEnd(isFailoverInProgress ? Status.FAILOVER_READY : Status.SUCCESS);
-    deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
+      deleteAllPreviousDumpMeta(work.getCurrentDumpPath());

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org