You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/04/17 06:03:04 UTC

[hive] branch master updated: HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 61d4ff2be4 HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)
61d4ff2be4 is described below

commit 61d4ff2be48b20df9fd24692c372ee9c2606babe
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Apr 17 11:32:51 2022 +0530

    HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 133 +++++++++++++++++++++
 .../ql/ddl/table/create/CreateTableOperation.java  |   3 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  11 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  21 +++-
 .../repl/bootstrap/load/table/LoadPartitions.java  |  33 +++--
 .../exec/repl/bootstrap/load/table/LoadTable.java  |  22 ++--
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |   9 ++
 .../hadoop/hive/ql/plan/ImportTableDesc.java       |   4 +
 8 files changed, 207 insertions(+), 29 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5782d20640..537d860434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -934,6 +934,139 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
         .verifyResults(new String[] { "beejing", "chengdu" });
   }
 
+  @Test
+  public void testOverwriteDuringBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create some partitioned and non partitioned tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (id int)")
+        .run("insert into table t2 values (15),(16)")
+        .run("create table t3 (place string) partitioned by (country string)")
+        .run("insert into table t3 partition(country='india') values ('chennai')")
+        .run("insert into table t3 partition(country='us') values ('new york')")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='china') values ('beejing')")
+        .run("insert into table t4 partition(country='nepal') values ('kathmandu')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Prepare for reverse bootstrap.
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    // Create one new table: It should get dropped. (tnew_managed)
+    // Create some new partition: The new partition should get dropped. (t2: france)
+    // Modify a table, the data should get overwritten. (t1)
+    // Modify a partition, the data should be overwritten. (t3: india value delhi)
+    // Drop a table, the table should be recreated(t2)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t3 partition(country='france') values ('lyon')")
+        .run("insert into table t3 partition(country='india') values ('delhi')")
+        .run("drop table t2");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t2, t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (121),(211)")
+        .run("insert into table t3 partition(country='india') values ('lucknow')")
+        .run("insert into table t2 values (11)")
+        .run("insert into table t4 partition(country='india') values ('kanpur')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')")
+        .run("insert into table t4 partition(country='china') values ('Shanghai')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
+
+    // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
+    // rest.
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // _bootstrap directory should be created as bootstrap enabled on external tables.
+    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+    FileStatus[] listStatus =
+        dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
+    ArrayList<String> tablesBootstrapped = new ArrayList<String>();
+    for (FileStatus file : listStatus) {
+      tablesBootstrapped.add(file.getPath().getName());
+    }
+
+    assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
+
+    // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    primary.run("use " + primaryDbName)
+        .run("select id from t1")
+        .verifyResults(new String[] { "1", "2", "3", "4", "101", "121", "211" })
+        .run("select id from t2")
+        .verifyResults(new String[] { "15", "16", "11" })
+        .run("select place from t3 where country = 'india'")
+        .verifyResults(new String[] {"chennai", "lucknow" })
+        .run("select place from t3 where country = 'us'")
+        .verifyResults(new String[] {"new york" })
+        .run("select place from t3 where country = 'france'")
+        .verifyFailure(new String[] { "lyon" })
+        .run("select place from t4 where country = 'china'")
+        .verifyResults(new String[] { "beejing", "Shanghai" })
+        .run("select place from t4 where country = 'india'")
+        .verifyResults(new String[] { "kanpur" })
+        .run("select place from t5 where country = 'china'")
+        .verifyResults(new String[] { "beejing" })
+        .run("show tables like 'tnew_managed'")
+        .verifyFailure(new String[]{"tnew_managed"});
+  }
+
   @NotNull
   private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
     List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index ef327f4435..6dacbb0b7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -66,7 +66,8 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
       Table existingTable = context.getDb().getTable(tbl.getDbName(), tbl.getTableName(), false);
       if (existingTable != null) {
         Map<String, String> dbParams = context.getDb().getDatabase(existingTable.getDbName()).getParameters();
-        if (desc.getReplicationSpec().allowEventReplacementInto(dbParams)) {
+        if (desc.getReplicationSpec().allowEventReplacementInto(dbParams) || desc.getReplicationSpec()
+            .isForceOverwrite()) {
           desc.setReplaceMode(true); // we replace existing table.
           // If location of an existing managed table is changed, then need to delete the old location if exists.
           // This scenario occurs when a managed table is converted into external table at source. In this case,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index f3b0a0ecd6..a8a333f640 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -311,7 +311,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         } else {
           LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext,
               loadTaskTracker, work.getMetricCollector());
-          tableTracker = loadTable.tasks(work.isIncrementalLoad());
+          tableTracker = loadTable.tasks(work.isIncrementalLoad(), work.isSecondFailover);
         }
 
         setUpDependencies(dbTracker, tableTracker);
@@ -336,7 +336,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
           // for a table we explicitly try to load partitions as there is no separate partitions events.
           LoadPartitions loadPartitions =
               new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent,
-                  work.dbNameToLoadIn, tableContext, work.getMetricCollector());
+                  work.dbNameToLoadIn, tableContext, work.getMetricCollector(), work.tablesToBootstrap);
           TaskTracker partitionsTracker = loadPartitions.tasks();
           partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
               partitionsTracker);
@@ -445,7 +445,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     LoadPartitions loadPartitions =
         new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
         event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector(),
-          event.lastPartSpecReplicated(), event.lastStageReplicated());
+          event.lastPartSpecReplicated(), event.lastStageReplicated(), getWork().tablesToBootstrap);
         /*
              the tableTracker here should be a new instance and not an existing one as this can
              only happen when we break in between loading partitions.
@@ -733,10 +733,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         return 0;
       }
     } else if (work.isSecondFailover) {
-      // DROP the tables to be bootstrapped.
+      // DROP the tables extra on target, which are not on source cluster.
 
       Hive db = getHive();
-      for (String table : work.tablesToBootstrap) {
+      for (String table : work.tablesToDrop) {
+        LOG.info("Dropping table {} for optimised bootstarap", work.dbNameToLoadIn + "." + table);
         db.dropTable(work.dbNameToLoadIn + "." + table, true);
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 271862aa59..b6072912c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,10 +52,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
 import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
@@ -93,7 +96,8 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
   private boolean shouldFailover;
   public boolean isFirstFailover;
   public boolean isSecondFailover;
-  public List<String> tablesToBootstrap = new ArrayList<>();
+  public List<String> tablesToBootstrap = new LinkedList<>();
+  public List<String> tablesToDrop = new LinkedList<>();
 
   /*
   these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -157,10 +161,17 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
       Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
       if (fs.exists(incBootstrapDir)) {
         if (isSecondFailover) {
-          String[] tableList = getBootstrapTableList(dumpDirParent, hiveConf);
-          tablesToBootstrap = Arrays.asList(tableList);
-          LOG.info("Optimised bootstrap for database {} with load with bootstrap table list as {}", dbNameToLoadIn,
-              tablesToBootstrap);
+          String[] bootstrappedTables = getBootstrapTableList(new Path(dumpDirectory).getParent(), hiveConf);
+          LOG.info("Optimised bootstrap load for database {} with initial bootstrapped table list as {}",
+              dbNameToLoadIn, tablesToBootstrap);
+          // Get list of tables bootstrapped.
+          Path tableMetaPath = new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME + "/" + sourceDbName);
+          tablesToBootstrap =
+              Stream.of(fs.listStatus(tableMetaPath)).map(st -> st.getPath().getName()).collect(Collectors.toList());
+          List<String> tableList = Arrays.asList(bootstrappedTables);
+          tablesToDrop = ListUtils.subtract(tableList, tablesToBootstrap);
+          LOG.info("Optimised bootstrap for database {} with drop table list as {} and bootstrap table list as {}",
+              dbNameToLoadIn, tablesToDrop, tablesToBootstrap);
         }
         this.bootstrapIterator = new BootstrapEventsIterator(
                 new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index bcbf20c53e..b043426c49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -80,21 +81,19 @@ public class LoadPartitions {
   private final ReplicationMetricCollector metricCollector;
 
   private final ImportTableDesc tableDesc;
+  private final List<String> tablesToBootstrap;
   private Table table;
 
-  public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
-                        TableEvent event, String dbNameToLoadIn,
-                        TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException {
-    this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null,
-      metricCollector, null, PartitionState.Stage.PARTITION);
+  public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn,
+      TableContext tableContext, ReplicationMetricCollector metricCollector, List<String> tablesToBootstrap) throws HiveException {
+    this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, metricCollector, null,
+        PartitionState.Stage.PARTITION, tablesToBootstrap);
   }
 
-  public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
-                        TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
-                        AlterTableAddPartitionDesc lastReplicatedPartition,
-                        ReplicationMetricCollector metricCollector,
-                        AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
-                        ReplicationState.PartitionState.Stage lastReplicatedStage) throws HiveException {
+  public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter,
+      TableEvent event, String dbNameToLoadIn, AlterTableAddPartitionDesc lastReplicatedPartition,
+      ReplicationMetricCollector metricCollector, AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
+      PartitionState.Stage lastReplicatedStage, List<String> tablesToBootstrap) throws HiveException {
     this.tracker = new TaskTracker(limiter);
     this.event = event;
     this.context = context;
@@ -106,6 +105,7 @@ public class LoadPartitions {
     this.metricCollector = metricCollector;
     this.lastReplicatedPartitionDesc = lastReplicatedPartitionDesc;
     this.lastReplicatedStage = lastReplicatedStage;
+    this.tablesToBootstrap = tablesToBootstrap;
   }
 
   public TaskTracker tasks() throws Exception {
@@ -134,6 +134,17 @@ public class LoadPartitions {
     } else {
       // existing
       if (table.isPartitioned()) {
+        if (tablesToBootstrap.stream().anyMatch(table.getTableName()::equalsIgnoreCase)) {
+          Hive hiveDb = Hive.get(context.hiveConf);
+          // Collect the non-existing partitions to drop.
+          List<Partition> partitions = hiveDb.getPartitions(table);
+          List<String> newParts = event.partitions(tableDesc);
+          for (Partition part : partitions) {
+            if (!newParts.contains(part.getName())) {
+              hiveDb.dropPartition(table.getDbName(), table.getTableName(), part.getValues(), true);
+            }
+          }
+        }
         List<AlterTableAddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
         if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
           updateReplicationState(initialReplicationState());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 824d8c70a0..54ca6e02fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -82,7 +82,7 @@ public class LoadTable {
     this.metricCollector = metricCollector;
   }
 
-  public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception {
+  public TaskTracker tasks(boolean isBootstrapDuringInc, boolean isSecondFailover) throws Exception {
     // Path being passed to us is a table dump location. We go ahead and load it in as needed.
     // If tblName is null, then we default to the table name specified in _metadata, which is good.
     // or are both specified, in which case, that's what we are intended to create the new table as.
@@ -95,6 +95,10 @@ public class LoadTable {
     ImportTableDesc tableDesc = event.tableDesc(dbName);
     Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
 
+    if (isSecondFailover) {
+      tableDesc.setForceOverwriteTable();
+    }
+
     // Normally, on import, trying to create a table or a partition in a db that does not yet exist
     // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
     // to create tasks to create a table inside a db that as-of-now does not exist, but there is
@@ -112,7 +116,7 @@ public class LoadTable {
     }
 
     Task<?> tblRootTask = null;
-    ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc);
+    ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc, isSecondFailover);
     switch (loadTblType) {
       case LOAD_NEW:
         break;
@@ -138,7 +142,7 @@ public class LoadTable {
        or in the case of an unpartitioned table. In all other cases, it should
        behave like a noop or a pure MD alter.
     */
-    newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
+    newTableTasks(tableDesc, tblRootTask, tableLocationTuple, isSecondFailover && table != null);
 
     // Set Checkpoint task as dependant to create table task. So, if same dump is retried for
     // bootstrap, we skip current table update.
@@ -159,9 +163,13 @@ public class LoadTable {
     return tracker;
   }
 
-  private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc)
+  private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc, boolean isSecondFailover)
           throws InvalidOperationException, HiveException {
-    if (table == null) {
+    // In case of second iteration of the optimised bootstrap, we don't drop & re create the table, instead we
+    // re-write the metadata, in order to prevent deletion of the data in the target cluster. So, that while copying
+    // data from the source cluster, we just operate on the modified/missing/additional files and can get rid of
+    // copying the files which are already there on both cluster and are same.
+    if (table == null || isSecondFailover) {
       return ReplLoadOpType.LOAD_NEW;
     }
 
@@ -180,11 +188,11 @@ public class LoadTable {
     return ReplLoadOpType.LOAD_REPLACE;
   }
 
-  private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple)
+  private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple, boolean setLoc)
       throws Exception {
     Table table = tblDesc.toTable(context.hiveConf);
     ReplicationSpec replicationSpec = event.replicationSpec();
-    if (!tblDesc.isExternal()) {
+    if (!tblDesc.isExternal() && !setLoc) {
       tblDesc.setLocation(null);
     }
     Task<?> createTableTask =
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index dd2224c64e..48d3f5cec7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -49,6 +49,7 @@ public class ReplicationSpec {
   //Determine if replication is done using repl or export-import
   private boolean isRepl = false;
   private boolean isMetadataOnlyForExternalTables = false;
+  private boolean isForceOverwrite = false;
 
   public void setInReplicationScope(boolean inReplicationScope) {
     isInReplicationScope = inReplicationScope;
@@ -418,4 +419,12 @@ public class ReplicationSpec {
   public void setRepl(boolean repl) {
     isRepl = repl;
   }
+
+  public boolean isForceOverwrite() {
+    return isForceOverwrite;
+  }
+
+  public void setForceOverwrite(boolean forceOverwrite) {
+    isForceOverwrite = forceOverwrite;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 232ea09035..23128ebfed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -190,4 +190,8 @@ public class ImportTableDesc {
   public Long getReplWriteId() {
     return this.createTblDesc.getReplWriteId();
   }
+
+  public void setForceOverwriteTable(){
+    this.createTblDesc.getReplicationSpec().setForceOverwrite(true);
+  }
 }