You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/02/26 14:01:34 UTC

[hive] branch master updated: HIVE-21197 : Hive replication can add duplicate data during migration to a target with hive.strict.managed.tables enabled (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 20abb4e  HIVE-21197 : Hive replication can add duplicate data during migration to a target with hive.strict.managed.tables enabled (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
20abb4e is described below

commit 20abb4e06b6a613a9b039f5de1b3bbcbefa2222f
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
AuthorDate: Tue Feb 26 19:22:55 2019 +0530

    HIVE-21197 : Hive replication can add duplicate data during migration to a target with hive.strict.managed.tables enabled (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
 .../TestReplicationScenariosAcrossInstances.java   |  18 +-
 .../parse/TestReplicationWithTableMigrationEx.java | 351 +++++++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |   8 +-
 .../hive/ql/txn/compactor/TestCompactor.java       |  56 ++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java    |  36 +++
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   | 119 +++++--
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  20 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   5 +-
 .../ql/exec/repl/bootstrap/load/LoadDatabase.java  |  23 +-
 .../repl/bootstrap/load/table/TableContext.java    |   9 +
 .../incremental/IncrementalLoadTasksBuilder.java   |   7 +
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  15 +
 .../org/apache/hadoop/hive/ql/parse/EximUtil.java  |   3 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |  36 ++-
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |  11 +
 .../ql/parse/repl/dump/io/TableSerializer.java     |   3 +-
 .../repl/load/message/AlterDatabaseHandler.java    |   3 +-
 .../parse/repl/load/message/CommitTxnHandler.java  |   5 +
 .../org/apache/hadoop/hive/ql/plan/DDLWork.java    |  16 +
 .../apache/hadoop/hive/ql/plan/ReplCopyWork.java   |  10 +
 .../plan/ReplRemoveFirstIncLoadPendFlagDesc.java   |  65 ++++
 .../hive/ql/txn/compactor/CompactorThread.java     |   9 +
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  12 +
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  14 +
 .../ql/txn/compactor/RemoteCompactorThread.java    |  14 +
 .../metastore/InjectableBehaviourObjectStore.java  |  25 ++
 26 files changed, 837 insertions(+), 56 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 3639ab1..69d2648 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -610,6 +610,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .run("show tables")
         .verifyResults(new String[] { "t1" });
 
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
+
     replica.load("", incrementalTuple.dumpLocation)
         .run("show databases")
         .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
@@ -620,6 +624,11 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .run("show tables")
         .verifyResults(new String[] { "t1", "t2" });
 
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbTwo).getParameters()));
+
     /*
        Start of cleanup
     */
@@ -1012,7 +1021,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, null);
 
     // Bootstrap Repl A -> B
-    WarehouseInstance.Tuple tupleReplica = replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+    replica.load(replicatedDbName, tuplePrimary.dumpLocation)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuplePrimary.lastReplicationId)
             .run("show tblproperties t1('custom.property')")
@@ -1020,9 +1029,14 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dumpFailure(replicatedDbName, null)
             .run("alter database " + replicatedDbName
                     + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')")
-            .dump(replicatedDbName, null);
+            .dumpFailure(replicatedDbName, null);//can not dump the db before first successful incremental load is done.
+
+    // do a empty incremental load to allow dump of replicatedDbName
+    WarehouseInstance.Tuple temp = primary.dump(primaryDbName, tuplePrimary.lastReplicationId);
+    replica.load(replicatedDbName, temp.dumpLocation); // first successful incremental load.
 
     // Bootstrap Repl B -> C
+    WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName, null);
     String replDbFromReplica = replicatedDbName + "_dupe";
     replica.load(replDbFromReplica, tupleReplica.dumpLocation)
             .run("use " + replDbFromReplica)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
new file mode 100644
index 0000000..4637da1
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.junit.*;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
+import static org.junit.Assert.*;
+
+/**
+ * TestReplicationWithTableMigrationEx - test replication for Hive2 to Hive3 (Strict managed tables)
+ */
+public class TestReplicationWithTableMigrationEx {
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigrationEx.class);
+  private static WarehouseInstance primary, replica;
+  private String primaryDbName, replicatedDbName;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrideProperties = new HashMap<>();
+    internalBeforeClassSetup(overrideProperties);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception {
+    HiveConf conf = new HiveConf(TestReplicationWithTableMigrationEx.class);
+    conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    MiniDFSCluster miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    final DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+    HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{
+      put("fs.defaultFS", fs.getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.strict.managed.tables", "true");
+      put("hive.metastore.transactional.event.listeners", "");
+    }};
+    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
+
+    HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
+      put("fs.defaultFS", fs.getUri().toString());
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.support.concurrency", "false");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+      put("hive.strict.managed.tables", "false");
+    }};
+    configsForPrimary.putAll(overrideConfigs);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    replica.close();
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+  }
+
+  private void prepareData(String primaryDbName) throws Throwable {
+    primary.run("use " + primaryDbName)
+        .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
+        .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) "
+                + "into 3 buckets stored as orc ")
+        .run("insert into tacid values(1)")
+        .run("insert into tacid values(2)")
+        .run("insert into tacid values(3)")
+        .run("alter table tacidpart add partition(country='france')")
+        .run("insert into tacidpart partition(country='india') values('mumbai')")
+        .run("insert into tacidpart partition(country='us') values('sf')")
+        .run("insert into tacidpart partition(country='france') values('paris')");
+    assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid")));
+    assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart")));
+  }
+
+  private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
+    replica.run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"tacid", "tacidpart"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(lastReplId)
+            .run("select count(*) from tacid")
+            .verifyResult("3")
+            .run("select id from tacid order by id")
+            .verifyResults(new String[]{"1", "2", "3"})
+            .run("select count(*) from tacidpart")
+            .verifyResult("3")
+            .run("select country from tacidpart order by country")
+            .verifyResults(new String[] {"france", "india", "us"});
+
+    assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
+    assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart")));
+  }
+
+  private WarehouseInstance.Tuple dumpWithLastEventIdHacked(int eventId) throws Throwable {
+    BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId> callerVerifier
+            = new BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId>() {
+      @Override
+      public CurrentNotificationEventId apply(CurrentNotificationEventId id) {
+        try {
+          LOG.warn("GetCurrentNotificationEventIdBehaviour called");
+          injectionPathCalled = true;
+          // keep events to reply during incremental
+          id.setEventId(eventId);
+          return id;
+        } catch (Throwable throwable) {
+          throwable.printStackTrace();
+          return null;
+        }
+      }
+    };
+
+    InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerVerifier);
+    try {
+      return primary.dump(primaryDbName, null);
+    } finally {
+      InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour();
+      callerVerifier.assertInjectionsPerformed(true, false);
+    }
+  }
+
+  @Test
+  public void testConcurrentOpDuringBootStrapDumpCreateTableReplay() throws Throwable {
+    prepareData(primaryDbName);
+
+    // dump with operation after last repl id is fetched.
+    WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(2);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+
+    // next incremental dump
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+  }
+
+  @Test
+  public void testConcurrentOpDuringBootStrapDumpInsertReplay() throws Throwable {
+    prepareData(primaryDbName);
+
+    // dump with operation after last repl id is fetched.
+    WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+
+    // next incremental dump
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+  }
+
+  @Test
+  public void testTableLevelDumpMigration() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (i int, j int)")
+            .dump(primaryDbName+".t1", null);
+    replica.run("create database " + replicatedDbName);
+    replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+    assertTrue(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters()));
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (1, 2)")
+            .dump(primaryDbName+".t1", tuple.lastReplicationId);
+    replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters()));
+  }
+
+  @Test
+  public void testConcurrentOpDuringBootStrapDumpInsertOverwrite() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
+            .run("insert into tacid values(1)")
+            .run("insert into tacid values(2)")
+            .run("insert into tacid values(3)")
+            .run("insert overwrite table tacid values(4)")
+            .run("insert into tacid values(5)");
+
+    // dump with operation after last repl id is fetched.
+    WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(2);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"tacid"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select count(*) from tacid")
+            .verifyResult("2")
+            .run("select id from tacid order by id")
+            .verifyResults(new String[]{"4", "5"});
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+
+    // next incremental dump
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"tacid"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select count(*) from tacid")
+            .verifyResult("2")
+            .run("select id from tacid order by id")
+            .verifyResults(new String[]{"4", "5",});
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+  }
+
+  private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
+    BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) {
+        injectionPathCalled = true;
+        LOG.warn("InjectableBehaviourObjectStore called for Verifier - Table: " + args.tblName);
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
+          LOG.warn("Verifier - DB: " + args.dbName
+                  + " Constraint Table: " + args.constraintTblName);
+          return false;
+        }
+        if (args.tblName != null) {
+          LOG.warn("Verifier - Table: " + args.tblName);
+          return !args.tblName.equalsIgnoreCase(tbl);
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+    try {
+      List<String> withClause = Collections.singletonList("'hive.metastore.transactional.event.listeners'='"
+              + DbNotificationListener.class.getCanonicalName() + "'");
+      replica.loadFailure(replicatedDbName, dumpLocation, withClause);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier();
+    }
+    callerVerifier.assertInjectionsPerformed(true, false);
+  }
+
+  @Test
+  public void testIncLoadPenFlagPropAlterDB() throws Throwable {
+    prepareData(primaryDbName);
+
+    // dump with operation after last repl id is fetched.
+    WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')")
+           .run("create table tbl_temp (fld int)")
+            .dump(primaryDbName, tuple.lastReplicationId);
+
+    loadWithFailureInAddNotification("tbl_temp", tuple.dumpLocation);
+    Database replDb = replica.getDatabase(replicatedDbName);
+    assertTrue(ReplUtils.isFirstIncPending(replDb.getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
+    assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val"));
+
+    // next incremental dump
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+  }
+
+  @Test
+  public void testIncLoadPenFlagWithMoveOptimization() throws Throwable {
+    List<String> withClause = Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
+
+    prepareData(primaryDbName);
+
+    // dump with operation after last repl id is fetched.
+    WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
+    replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+
+    // next incremental dump
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index c0d416c..56eae91 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -121,9 +121,7 @@ public class WarehouseInstance implements Closeable {
   private void initialize(String cmRoot, String externalTableWarehouseRoot, String warehouseRoot,
       Map<String, String> overridesForHiveConf) throws Exception {
     hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
-    for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) {
-      hiveConf.set(entry.getKey(), entry.getValue());
-    }
+
     String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
     if (metaStoreUri != null) {
       hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
@@ -153,6 +151,10 @@ public class WarehouseInstance implements Closeable {
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
+    for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) {
+      hiveConf.set(entry.getKey(), entry.getValue());
+    }
+
     MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
 
     // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index dc7b287..61be5a3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1536,6 +1536,62 @@ public class TestCompactor {
     Assert.assertEquals("The hash codes must be equal", compactionInfo.hashCode(), compactionInfo1.hashCode());
   }
 
+  @Test
+  public void testDisableCompactionDuringReplLoad() throws Exception {
+    String tblName = "discomp";
+    String database = "discomp_db";
+    executeStatementOnDriver("drop database if exists " + database + " cascade", driver);
+    executeStatementOnDriver("create database " + database, driver);
+    executeStatementOnDriver("CREATE TABLE " + database + "." + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(ds string)" +
+            " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+            " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + database + "." + tblName + " partition (ds) values (1, 'fred', " +
+            "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    executeStatementOnDriver("ALTER TABLE " + database + "." + tblName +
+            " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'true')", driver);
+    List<ShowCompactResponseElement> compacts = getCompactionList();
+    Assert.assertEquals(0, compacts.size());
+
+    executeStatementOnDriver("alter database " + database +
+            " set dbproperties ('hive.repl.first.inc.pending' = 'true')", driver);
+    executeStatementOnDriver("ALTER TABLE " + database + "." + tblName +
+            " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'false')", driver);
+    compacts = getCompactionList();
+    Assert.assertEquals(0, compacts.size());
+
+    executeStatementOnDriver("alter database " + database +
+            " set dbproperties ('hive.repl.first.inc.pending' = 'false')", driver);
+    executeStatementOnDriver("ALTER TABLE " + database + "." + tblName +
+            " SET TBLPROPERTIES ( 'hive.repl.first.inc.pending' = 'false')", driver);
+    compacts = getCompactionList();
+    Assert.assertEquals(2, compacts.size());
+    List<String> partNames = new ArrayList<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals(database, compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    Assert.assertEquals("ds=today", partNames.get(1));
+    Assert.assertEquals("ds=yesterday", partNames.get(0));
+    executeStatementOnDriver("drop database if exists " + database + " cascade", driver);
+
+    // Finish the scheduled compaction for ttp2
+    runWorker(conf);
+    runCleaner(conf);
+  }
+
+  private List<ShowCompactResponseElement> getCompactionList() throws Exception {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    runInitiator(conf);
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    return rsp.getCompacts();
+  }
+
   private void writeBatch(org.apache.hive.hcatalog.streaming.StreamingConnection connection,
     DelimitedInputWriter writer,
     boolean closeEarly) throws InterruptedException, org.apache.hive.hcatalog.streaming.StreamingException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1ab4d62..0bfff08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -225,6 +225,7 @@ import org.apache.hadoop.hive.ql.plan.PrivilegeDesc;
 import org.apache.hadoop.hive.ql.plan.PrivilegeObjectDesc;
 import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReplRemoveFirstIncLoadPendFlagDesc;
 import org.apache.hadoop.hive.ql.plan.RevokeDesc;
 import org.apache.hadoop.hive.ql.plan.RoleDDLDesc;
 import org.apache.hadoop.hive.ql.plan.ShowColumnsDesc;
@@ -286,6 +287,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.common.util.AnnotationUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
@@ -661,6 +663,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (work.getAlterMaterializedViewDesc() != null) {
         return alterMaterializedView(db, work.getAlterMaterializedViewDesc());
       }
+
+      if (work.getReplSetFirstIncLoadFlagDesc() != null) {
+        return remFirstIncPendFlag(db, work.getReplSetFirstIncLoadFlagDesc());
+      }
     } catch (Throwable e) {
       failed(e);
       return 1;
@@ -5199,6 +5205,36 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     return retval;
   }
 
+  private int remFirstIncPendFlag(Hive hive, ReplRemoveFirstIncLoadPendFlagDesc desc) throws HiveException, TException {
+    String dbNameOrPattern = desc.getDatabaseName();
+    String tableNameOrPattern = desc.getTableName();
+    Map<String, String> parameters;
+    // For database level load tableNameOrPattern will be null. Flag is set only in database for db level load.
+    if (tableNameOrPattern != null && !tableNameOrPattern.isEmpty()) {
+      // For table level load, dbNameOrPattern is db name and not a pattern.
+      for (String tableName : Utils.matchesTbl(hive, dbNameOrPattern, tableNameOrPattern)) {
+        org.apache.hadoop.hive.metastore.api.Table tbl = hive.getMSC().getTable(dbNameOrPattern, tableName);
+        parameters = tbl.getParameters();
+        String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null;
+        if (incPendPara != null) {
+          parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG);
+          hive.getMSC().alter_table(dbNameOrPattern, tableName, tbl);
+        }
+      }
+    } else {
+      for (String dbName : Utils.matchesDb(hive, dbNameOrPattern)) {
+        Database database = hive.getMSC().getDatabase(dbName);
+        parameters = database.getParameters();
+        String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null;
+        if (incPendPara != null) {
+          parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG);
+          hive.getMSC().alterDatabase(dbName, database);
+        }
+      }
+    }
+    return 0;
+  }
+
   /*
   uses the authorizer from SessionState will need some more work to get this to run in parallel,
   however this should not be a bottle neck so might not need to parallelize this.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 179f291..55a0c1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -33,8 +33,10 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.ListIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +63,75 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     super();
   }
 
+  // If file is already present in base directory, then remove it from the list.
+  // Check  HIVE-21197 for more detail
+  private void updateSrcFileListForDupCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles,
+                                           long writeId, int stmtId) throws IOException {
+    FileStatus[] statuses;
+    try {
+      statuses = dstFs.listStatus(toPath, path -> {
+        String fn = path.getName();
+        try {
+          return dstFs.getFileStatus(path).isDirectory() && fn.startsWith(AcidUtils.BASE_PREFIX);
+        } catch (IOException e) {
+          LOG.error("File listing failed for " + toPath, e);
+          throw new RuntimeException(e.getMessage());
+        }
+      });
+    } catch (FileNotFoundException e) {
+      LOG.debug("Path {} does not exist, will be created before copy", toPath);
+      return;
+    }
+
+    if (statuses.length > 1) {
+      // if more than one base directory is present, then it means one or more replace operation is done. Any duplicate
+      // check after that may cause data loss as the check will happen with the first base directory
+      // which is no more valid.
+      LOG.info("Number of base directory {} in path {} is more than one. Duplicate check should not be done.",
+              statuses, toPath);
+      return;
+    }
+
+    ListIterator<ReplChangeManager.FileInfo> iter = srcFiles.listIterator();
+    Path basePath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(true, writeId, writeId, stmtId));
+    while (iter.hasNext()) {
+      Path filePath = new Path(basePath, iter.next().getSourcePath().getName());
+      if (dstFs.exists(filePath)) {
+        LOG.debug("File " + filePath + " is already present in base directory. So removing it from the list.");
+        iter.remove();
+      }
+    }
+  }
+
+  private void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles)
+          throws IOException {
+    for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+      if (srcFile.isUseSourcePath()) {
+        continue;
+      }
+      String destFileName = srcFile.getCmPath().getName();
+      Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
+      Path destFile = new Path(destRoot, destFileName);
+      if (dstFs.exists(destFile)) {
+        String destFileWithSourceName = srcFile.getSourcePath().getName();
+        Path newDestFile = new Path(destRoot, destFileWithSourceName);
+
+        // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
+        // directly to table path (bypassing staging directory) then there might be some stale files from previous
+        // incomplete/failed load. No need of recycle as this is a case of stale file.
+        if (dstFs.exists(newDestFile)) {
+          LOG.debug(" file " + newDestFile + " is deleted before renaming");
+          dstFs.delete(newDestFile, true);
+        }
+        boolean result = dstFs.rename(destFile, newDestFile);
+        if (!result) {
+          throw new IllegalStateException(
+                  "could not rename " + destFile.getName() + " to " + newDestFile.getName());
+        }
+      }
+    }
+  }
+
   @Override
   protected int execute(DriverContext driverContext) {
     LOG.debug("ReplCopyTask.execute()");
@@ -120,6 +191,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
         }
 
         if (work.isCopyToMigratedTxnTable()) {
+          if (work.isNeedCheckDuplicateCopy()) {
+            updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
+                    ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
+            if (srcFiles.isEmpty()) {
+              LOG.info("All files are already present in the base directory. Skipping copy task.");
+              return 0;
+            }
+          }
           // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
           // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
           // The toPath received in ReplCopyWork is pointing to table/partition base location.
@@ -133,8 +212,12 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
             return 6;
           }
           long writeId = Long.parseLong(writeIdString);
-          toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId,
-                  driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement()));
+          // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
+          // duplicate copy from the source. Check HIVE-21197 for more detail.
+          int stmtId = (writeId == ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID) ?
+                  ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
+                  driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement();
+          toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
         }
       } else {
         // This flow is usually taken for IMPORT command
@@ -179,31 +262,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
       // If a file is copied from CM path, then need to rename them using original source file name
       // This is needed to avoid having duplicate files in target if same event is applied twice
       // where the first event refers to source path and  second event refers to CM path
-      for (ReplChangeManager.FileInfo srcFile : srcFiles) {
-        if (srcFile.isUseSourcePath()) {
-          continue;
-        }
-        String destFileName = srcFile.getCmPath().getName();
-        Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
-        Path destFile = new Path(destRoot, destFileName);
-        if (dstFs.exists(destFile)) {
-          String destFileWithSourceName = srcFile.getSourcePath().getName();
-          Path newDestFile = new Path(destRoot, destFileWithSourceName);
-
-          // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
-          // directly to table path (bypassing staging directory) then there might be some stale files from previous
-          // incomplete/failed load. No need of recycle as this is a case of stale file.
-          if (dstFs.exists(newDestFile)) {
-            LOG.debug(" file " + newDestFile + " is deleted before renaming");
-            dstFs.delete(newDestFile, true);
-          }
-          boolean result = dstFs.rename(destFile, newDestFile);
-          if (!result) {
-            throw new IllegalStateException(
-                "could not rename " + destFile.getName() + " to " + newDestFile.getName());
-          }
-        }
-      }
+      renameFileCopiedFromCmPath(toPath, dstFs, srcFiles);
       return 0;
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
@@ -271,12 +330,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
       ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
-      if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
+      if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) {
         rcwork.setDeleteDestIfExist(true);
         rcwork.setAutoPurge(isAutoPurge);
         rcwork.setNeedRecycle(needRecycle);
       }
       rcwork.setCopyToMigratedTxnTable(copyToMigratedTxnTable);
+      // For replace case, duplicate check should not be done. The new base directory will automatically make the older
+      // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple
+      // replace events getting replayed in the first incremental load.
+      rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace());
       LOG.debug("ReplCopyTask:\trcwork");
       if (replicationSpec.isLazy()) {
         LOG.debug("ReplCopyTask:\tlazy");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index acfa354..3704344 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -255,9 +255,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
     assert (bootDumpBeginReplId >= 0L);
 
+    LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern);
+
     String validTxnList = getValidTxnListForReplDump(hiveDb);
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+      Database db = hiveDb.getDatabase(dbName);
+      if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) {
+        // For replicated (target) database, until after first successful incremental load, the database will not be
+        // in a consistent state. Avoid allowing replicating this database to a new target.
+        throw new HiveException("Replication dump not allowed for replicated database" +
+                " with first incremental dump pending : " + dbName);
+      }
       replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
               Utils.getAllTables(hiveDb, dbName).size(),
               hiveDb.getAllFunctions().size());
@@ -274,9 +283,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
           LOG.debug(
               "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
+
           try {
-            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName,
-                                                                                        conf);
+            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
+            Table table = tableTuple != null ? tableTuple.object : null;
+            if (table != null && ReplUtils.isFirstIncPending(table.getParameters())) {
+              // For replicated (target) table, until after first successful incremental load, the table will not be
+              // in a consistent state. Avoid allowing replicating this table to a new target.
+              throw new HiveException("Replication dump not allowed for replicated table" +
+                      " with first incremental dump pending : " + tblName);
+            }
             if (shouldWriteExternalTableLocationInfo
                     && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
               LOG.debug("Adding table {} to external tables list", tblName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 4dc14f4..7062eda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -129,7 +129,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         case Database:
           DatabaseEvent dbEvent = (DatabaseEvent) next;
           dbTracker =
-              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
+              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, work.tableNameToLoadIn, loadTaskTracker)
                   .tasks();
           loadTaskTracker.update(dbTracker);
           if (work.hasDbState()) {
@@ -370,6 +370,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
       // If incremental events are already applied, then check and perform if need to bootstrap any tables.
       if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+        // No need to set incremental load pending flag for external tables as the files will be copied to the same path
+        // for external table unlike migrated txn tables. Currently bootstrap during incremental is done only for
+        // external tables.
         if (work.hasBootstrapLoadTasks()) {
           LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
                   + "mode after applying all events.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 0fd305a..d6ccf58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -48,13 +48,16 @@ public class LoadDatabase {
 
   private final DatabaseEvent event;
   private final String dbNameToLoadIn;
+  private final boolean isTableLevelLoad;
 
-  public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
+  public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, String tblNameToLoadIn,
       TaskTracker loadTaskTracker) {
     this.context = context;
     this.event = event;
     this.dbNameToLoadIn = dbNameToLoadIn;
     this.tracker = new TaskTracker(loadTaskTracker);
+    //TODO : Load database should not be called for table level load.
+    isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty();
   }
 
   public TaskTracker tasks() throws SemanticException {
@@ -123,7 +126,7 @@ public class LoadDatabase {
     CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
     createDbDesc.setName(dbObj.getName());
     createDbDesc.setComment(dbObj.getDescription());
-    createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory));
+    createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory, !isTableLevelLoad));
 
     // note that we do not set location - for repl load, we want that auto-created.
     createDbDesc.setIfNotExists(false);
@@ -135,7 +138,8 @@ public class LoadDatabase {
   }
 
   private Task<? extends Serializable> alterDbTask(Database dbObj) {
-    return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory), context.hiveConf);
+    return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory, !isTableLevelLoad),
+            context.hiveConf);
   }
 
   private Task<? extends Serializable> setOwnerInfoTask(Database dbObj) {
@@ -146,7 +150,7 @@ public class LoadDatabase {
     return TaskFactory.get(work, context.hiveConf);
   }
 
-  private static Map<String, String> updateDbProps(Database dbObj, String dumpDirectory) {
+  private static Map<String, String> updateDbProps(Database dbObj, String dumpDirectory, boolean needSetIncFlag) {
     /*
     explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going
     to run multiple times and explicit logic is in place which prevents updates to tables when db level
@@ -158,6 +162,15 @@ public class LoadDatabase {
     // Add the checkpoint key to the Database binding it to current dump directory.
     // So, if retry using same dump, we shall skip Database object update.
     parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory);
+
+    if (needSetIncFlag) {
+      // This flag will be set to false after first incremental load is done. This flag is used by repl copy task to
+      // check if duplicate file check is required or not. This flag is used by compaction to check if compaction can be
+      // done for this database or not. If compaction is done before first incremental then duplicate check will fail as
+      // compaction may change the directory structure.
+      parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true");
+    }
+
     return parameters;
   }
 
@@ -173,7 +186,7 @@ public class LoadDatabase {
 
     public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
         TaskTracker loadTaskTracker) {
-      super(context, event, dbNameToLoadIn, loadTaskTracker);
+      super(context, event, dbNameToLoadIn, null, loadTaskTracker);
     }
 
     @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
index 8e01fb1..dbda41d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
 public class TableContext {
   final String dbNameToLoadIn;
@@ -43,6 +44,14 @@ public class TableContext {
       throws SemanticException {
     if (StringUtils.isNotBlank(tableNameToLoadIn)) {
       importTableDesc.setTableName(tableNameToLoadIn);
+
+      //For table level load, add this property to avoid duplicate copy.
+      // This flag will be set to false after first incremental load is done. This flag is used by
+      // repl copy task to check if duplicate file check is required or not. This flag is used by
+      // compaction to check if compaction can be done for this database or not. If compaction is
+      // done before first incremental then duplicate check will fail as compaction may change
+      // the directory structure.
+      importTableDesc.getTblProps().put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true");
     }
     return importTableDesc;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 3e0c969..3938833 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
+import org.apache.hadoop.hive.ql.plan.ReplRemoveFirstIncLoadPendFlagDesc;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.slf4j.Logger;
 
@@ -164,6 +165,12 @@ public class IncrementalLoadTasksBuilder {
                   lastEventid);
         }
       }
+
+      ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(dbName, tableName);
+      Task<? extends Serializable> updateIncPendTask = TaskFactory.get(new DDLWork(inputs, outputs, desc), conf);
+      taskChainTail.addDependentTask(updateIncPendTask);
+      taskChainTail = updateIncPendTask;
+
       Map<String, String> dbProps = new HashMap<>();
       dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
       ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 91eeb13..cb81dd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -60,6 +60,7 @@ public class ReplUtils {
 
   public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
   public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+  public static final String REPL_FIRST_INC_PENDING_FLAG = "hive.repl.first.inc.pending";
 
   // write id allocated in the current execution context which will be passed through config to be used by different
   // tasks.
@@ -75,6 +76,10 @@ public class ReplUtils {
   // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
   public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L;
 
+  // we keep the statement id as 0 so that the base directory is created with 0 and is easy to find out during
+  // duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained.
+  public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0;
+
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
@@ -187,4 +192,14 @@ public class ReplUtils {
       }
     };
   }
+
+  public static boolean isFirstIncPending(Map<String, String> parameters) {
+    if (parameters == null) {
+      return false;
+    }
+    String firstIncPendFlag = parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG);
+    // If flag is not set, then we assume first incremental load is done as the database/table may be created by user
+    // and not through replication.
+    return firstIncPendFlag != null && !firstIncPendFlag.isEmpty() && "true".equalsIgnoreCase(firstIncPendFlag);
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 156f755..8c34a61 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -275,7 +275,8 @@ public class EximUtil {
       tmpParameters.entrySet()
                 .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
                             || e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY)
-                            || e.getKey().equals(ReplChangeManager.SOURCE_OF_REPLICATION));
+                            || e.getKey().equals(ReplChangeManager.SOURCE_OF_REPLICATION)
+                            || e.getKey().equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG));
       dbObj.setParameters(tmpParameters);
     }
     try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index d4fb191..ed06352 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -472,10 +473,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadFileType lft;
     boolean isAutoPurge = false;
     boolean needRecycle = false;
-    boolean copyToMigratedTxnTable = false;
+    boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
 
-    if (replicationSpec.isInReplicationScope() &&
-            x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) {
+    if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
+            x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
       lft = LoadFileType.IGNORE;
       destPath = loadPath = tgtPath;
       isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
@@ -485,7 +486,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
         needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
       }
-      copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
     } else {
       if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
         String mmSubdir = replace ? AcidUtils.baseDir(writeId)
@@ -531,8 +531,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
 
 
-    if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table) &&
-            !replicationSpec.isMigratingToTxnTable()) {
+    if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
       LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
               Collections.singletonList(destPath),
               Collections.singletonList(tgtPath),
@@ -603,7 +602,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
     boolean isAutoPurge = false;
     boolean needRecycle = false;
-    boolean copyToMigratedTxnTable = false;
+    boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
 
     if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)
             || (tblDesc.isExternal() && tblDesc.getLocation() == null)) {
@@ -624,8 +623,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       LoadFileType loadFileType;
       Path destPath;
-      if (replicationSpec.isInReplicationScope() &&
-              x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) {
+      if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
+              x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
         loadFileType = LoadFileType.IGNORE;
         destPath = tgtLocation;
         isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
@@ -635,7 +634,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
           needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
         }
-        copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
       } else {
         loadFileType = replicationSpec.isReplace() ?
                 LoadFileType.REPLACE_ALL :
@@ -675,8 +673,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import?
       //       See setLoadFileType and setIsAcidIow calls elsewhere for an example.
-      if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps()) &&
-              !replicationSpec.isMigratingToTxnTable()) {
+      if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
         LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
                 Collections.singletonList(destPath),
                 Collections.singletonList(tgtLocation),
@@ -1136,6 +1133,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     Task<?> dropTblTask = null;
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
+    boolean firstIncPending;
 
     // Normally, on import, trying to create a table or a partition in a db that does not yet exist
     // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
@@ -1147,6 +1145,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!waitOnPrecursor){
         throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
       }
+      // For warehouse level replication, if the database itself is getting created in this load, then no need to
+      // check for duplicate copy. Check HIVE-21197 for more detail.
+      firstIncPending = false;
+    } else {
+      // For database replication, get the flag from database parameter. Check HIVE-21197 for more detail.
+      firstIncPending = ReplUtils.isFirstIncPending(parentDb.getParameters());
     }
 
     if (table != null) {
@@ -1164,6 +1168,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (x.getEventType() == DumpType.EVENT_CREATE_TABLE) {
         dropTblTask = dropTableTask(table, x, replicationSpec);
         table = null;
+      } else if (!firstIncPending) {
+        //If in db pending flag is not set then check in table parameter for table level load.
+        // Check HIVE-21197 for more detail.
+        firstIncPending = ReplUtils.isFirstIncPending(table.getParameters());
       }
     } else {
       // If table doesn't exist, allow creating a new one only if the database state is older than the update.
@@ -1175,6 +1183,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
+    // For first incremental load just after bootstrap, we need to check for duplicate copy.
+    // Check HIVE-21197 for more detail.
+    replicationSpec.setNeedDupCopyCheck(firstIncPending);
+
     if (updatedMetadata != null) {
       updatedMetadata.set(replicationSpec.getReplicationState(),
                           tblDesc.getDatabaseName(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index d55ee20..055d454 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -49,6 +49,7 @@ public class ReplicationSpec {
   private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
   private boolean isMigratingToTxnTable = false;
   private boolean isMigratingToExternalTable = false;
+  private boolean needDupCopyCheck = false;
 
   // Key definitions related to replication
   public enum KEY {
@@ -426,4 +427,14 @@ public class ReplicationSpec {
       destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId);
     }
   }
+
+  public boolean needDupCopyCheck() {
+    return needDupCopyCheck;
+  }
+
+  public void setNeedDupCopyCheck(boolean isFirstIncPending) {
+    // Duplicate file check during copy is required until after first successful incremental load.
+    // Check HIVE-21197 for more detail.
+    this.needDupCopyCheck = isFirstIncPending;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 4d8ffe9..552183a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -75,7 +75,8 @@ public class TableSerializer implements JsonWriter.Serializer {
     Map<String, String> parameters = table.getParameters();
     if (parameters != null) {
       parameters.entrySet()
-              .removeIf(e -> e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY));
+              .removeIf(e -> (e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY) ||
+                      e.getKey().equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG)));
     }
 
     if (additionalPropertiesProvider.isInReplicationScope()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index e68e055..c5dfe7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -70,7 +70,8 @@ public class AlterDatabaseHandler extends AbstractMessageHandler {
           if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
                   || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())
                   || key.equals(ReplUtils.REPL_CHECKPOINT_KEY)
-                  || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)) {
+                  || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)
+                  || key.equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG)) {
             continue;
           }
           newDbProps.put(key, entry.getValue());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 0619bd3..9f0f705 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -107,6 +108,10 @@ public class CommitTxnHandler extends AbstractMessageHandler {
       updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
     }
     context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());
+    if (tasks.isEmpty()) {
+      //will be used for setting the last repl id.
+      return Collections.singletonList(commitTxnTask);
+    }
     DAGTraversal.traverse(tasks, new AddDependencyToLeaves(commitTxnTask));
     return tasks;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
index 8ed3b03..eb6011f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
@@ -100,6 +100,8 @@ public class DDLWork implements Serializable {
 
   private CreateOrDropTriggerToPoolMappingDesc triggerToPoolMappingDesc;
 
+  private ReplRemoveFirstIncLoadPendFlagDesc replSetFirstIncLoadFlagDesc;
+
   boolean needLock = false;
 
   /**
@@ -612,6 +614,12 @@ public class DDLWork implements Serializable {
     this.triggerToPoolMappingDesc = triggerToPoolMappingDesc;
   }
 
+  public DDLWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+                 ReplRemoveFirstIncLoadPendFlagDesc replSetFirstIncLoadFlagDesc) {
+    this(inputs, outputs);
+    this.replSetFirstIncLoadFlagDesc = replSetFirstIncLoadFlagDesc;
+  }
+
   /**
    * @return Create Database descriptor
    */
@@ -1356,4 +1364,12 @@ public class DDLWork implements Serializable {
   public void setTriggerToPoolMappingDesc(CreateOrDropTriggerToPoolMappingDesc triggerToPoolMappingDesc) {
     this.triggerToPoolMappingDesc = triggerToPoolMappingDesc;
   }
+
+  public ReplRemoveFirstIncLoadPendFlagDesc getReplSetFirstIncLoadFlagDesc() {
+    return replSetFirstIncLoadFlagDesc;
+  }
+
+  public void setReplSetFirstIncLoadFlagDesc(ReplRemoveFirstIncLoadPendFlagDesc replSetFirstIncLoadFlagDesc) {
+    this.replSetFirstIncLoadFlagDesc = replSetFirstIncLoadFlagDesc;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
index 4d34f8d..c631f3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -59,6 +59,8 @@ public class ReplCopyWork extends CopyWork {
 
   private boolean copyToMigratedTxnTable;
 
+  private boolean checkDuplicateCopy = false;
+
   public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) {
     super(srcPath, destPath, errorOnSrcEmpty);
   }
@@ -110,4 +112,12 @@ public class ReplCopyWork extends CopyWork {
   public void setCopyToMigratedTxnTable(boolean copyToMigratedTxnTable) {
     this.copyToMigratedTxnTable = copyToMigratedTxnTable;
   }
+
+  public boolean isNeedCheckDuplicateCopy() {
+    return checkDuplicateCopy;
+  }
+
+  public void setCheckDuplicateCopy(boolean flag) {
+    checkDuplicateCopy = flag;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java
new file mode 100644
index 0000000..23d5825
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+
+/**
+ * ReplRemoveFirstIncLoadPendFlagDesc. -- Remove the flag from db/table property if its already present.
+ *
+ */
+@Explain(displayName = "Set First Incr Load Pend Flag", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplRemoveFirstIncLoadPendFlagDesc extends DDLDesc implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  String databaseName;
+  String tableName;
+  boolean incLoadPendingFlag;
+
+  /**
+   * For serialization only.
+   */
+  public ReplRemoveFirstIncLoadPendFlagDesc() {
+  }
+
+  public ReplRemoveFirstIncLoadPendFlagDesc(String databaseName, String tableName) {
+    super();
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  @Explain(displayName="db_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  @Explain(displayName="table_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index f45140d..94f0031 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -91,6 +93,8 @@ public abstract class CompactorThread extends Thread implements Configurable {
    */
   abstract Table resolveTable(CompactionInfo ci) throws MetaException;
 
+  abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws TException;
+
   /**
    * Get list of partitions by name.
    * @param ci compaction info.
@@ -217,4 +221,9 @@ public abstract class CompactorThread extends Thread implements Configurable {
     thread.init(new AtomicBoolean(), new AtomicBoolean());
     thread.start();
   }
+
+  protected boolean replIsCompactionDisabledForTable(Table tbl) {
+    // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
+    return ReplUtils.isFirstIncPending(tbl.getParameters());
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a37c983..deabec6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -104,6 +104,12 @@ public class Initiator extends MetaStoreCompactorThread {
             }
             LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
             try {
+              if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+                // Compaction is disabled for replicated database until after first successful incremental load.
+                LOG.info("Compaction is disabled for database " + ci.dbname);
+                continue;
+              }
+
               Table t = resolveTable(ci);
               if (t == null) {
                 // Most likely this means it's a temp table
@@ -118,6 +124,12 @@ public class Initiator extends MetaStoreCompactorThread {
                 continue;
               }
 
+              if (replIsCompactionDisabledForTable(t)) {
+                // Compaction is disabled for replicated table until after first successful incremental load.
+                LOG.info("Compaction is disabled for table " + ci.getFullTableName());
+                continue;
+              }
+
               // Check to see if this is a table level request on a partitioned table.  If so,
               // then it's a dynamic partitioning case and we shouldn't check the table itself.
               if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 1ddc54d..a6dd4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -24,10 +24,13 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.thrift.TException;
 
 import java.util.Collections;
 import java.util.List;
@@ -71,6 +74,17 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
     }
   }
 
+  @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException {
+    try {
+      Database database = rs.getDatabase(getDefaultCatalog(conf), dbName);
+      // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
+      return ReplUtils.isFirstIncPending(database.getParameters());
+    } catch (NoSuchObjectException e) {
+      LOG.info("Unable to find database " + dbName);
+      return true;
+    }
+  }
+
   @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException {
     try {
       return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index 9678786..4235184 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.thrift.TException;
 
 import java.util.Collections;
@@ -53,6 +56,17 @@ public class RemoteCompactorThread extends CompactorThread {
     }
   }
 
+  @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException {
+    try {
+      Database database = msc.getDatabase(getDefaultCatalog(conf), dbName);
+      // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
+      return ReplUtils.isFirstIncPending(database.getParameters());
+    } catch (NoSuchObjectException e) {
+      LOG.info("Unable to find database " + dbName);
+      return true;
+    }
+  }
+
   @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException {
     try {
       return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
index 9daff37..6c7fe11 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 
 import static org.junit.Assert.assertEquals;
 
@@ -87,6 +88,9 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
 
   private static com.google.common.base.Function<CallerArguments, Boolean> alterTableModifier = null;
 
+  private static com.google.common.base.Function<CurrentNotificationEventId, CurrentNotificationEventId>
+          getCurrNotiEventIdModifier = null;
+
   // Methods to set/reset getTable modifier
   public static void setGetTableBehaviour(com.google.common.base.Function<Table, Table> modifier){
     getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier;
@@ -270,4 +274,25 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
     }
     return super.alterDatabase(catalogName, dbname, db);
   }
+
+  // Methods to set/reset getCurrentNotificationEventId modifier
+  public static void setGetCurrentNotificationEventIdBehaviour(
+          com.google.common.base.Function<CurrentNotificationEventId, CurrentNotificationEventId> modifier){
+    getCurrNotiEventIdModifier = modifier;
+  }
+  public static void resetGetCurrentNotificationEventIdBehaviour(){
+    setGetCurrentNotificationEventIdBehaviour(null);
+  }
+
+  @Override
+  public CurrentNotificationEventId getCurrentNotificationEventId() {
+    CurrentNotificationEventId id = super.getCurrentNotificationEventId();
+    if (getCurrNotiEventIdModifier != null) {
+      id = getCurrNotiEventIdModifier.apply(id);
+      if (id == null) {
+        throw new RuntimeException("InjectableBehaviourObjectStore: Invalid getCurrentNotificationEventId");
+      }
+    }
+    return id;
+  }
 }