You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/05/25 07:43:35 UTC

hive git commit: HIVE-19499: Bootstrap REPL LOAD shall add tasks to create checkpoints for db/tables/partitions. (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master c358ef5af -> 8d5e8a60a


HIVE-19499: Bootstrap REPL LOAD shall add tasks to create checkpoints for db/tables/partitions. (Sankar Hariappan, reviewed by Mahesh Kumar Behera,Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d5e8a60
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d5e8a60
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d5e8a60

Branch: refs/heads/master
Commit: 8d5e8a60ae6eeaaac8369a718fa7d5c04d142f57
Parents: c358ef5
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri May 25 13:13:22 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Fri May 25 13:13:22 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 46 ++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java | 15 +++++
 .../hadoop/hive/ql/exec/repl/ReplUtils.java     | 58 ++++++++++++++++++++
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |  3 +-
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |  2 +
 .../exec/repl/bootstrap/load/LoadDatabase.java  |  6 ++
 .../exec/repl/bootstrap/load/TaskTracker.java   | 19 ++++++-
 .../bootstrap/load/table/LoadPartitions.java    | 40 +++++++-------
 .../repl/bootstrap/load/table/LoadTable.java    | 34 +++++-------
 .../exec/repl/bootstrap/load/util/Context.java  |  4 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 49 ++++++++---------
 11 files changed, 203 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 8caa55c..bcbf113 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,9 +25,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -47,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,6 +59,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TestReplicationScenariosAcrossInstances {
   @Rule
@@ -757,4 +763,44 @@ public class TestReplicationScenariosAcrossInstances {
     FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf);
     assertFalse(fs.exists(cSerdesTableDumpLocation));
   }
+
+  private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
+    assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+    assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
+  }
+
+  @Test
+  public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (10)")
+            .run("create table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='uk') values ('london')")
+            .run("insert into table t2 partition(country='us') values ('sfo')")
+            .dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select country from t2")
+            .verifyResults(Arrays.asList("india", "uk", "us"));
+
+    Database db = replica.getDatabase(replicatedDbName);
+    verifyIfCkptSet(db.getParameters(), tuple.dumpLocation);
+    Table t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation);
+    Table t2 = replica.getTable(replicatedDbName, "t2");
+    verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation);
+    Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tuple.dumpLocation);
+    Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us"));
+    verifyIfCkptSet(us.getParameters(), tuple.dumpLocation);
+    Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk"));
+    verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index dc31e92..17fd799 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
@@ -319,6 +322,18 @@ public class WarehouseInstance implements Closeable {
     }
   }
 
+  public Database getDatabase(String dbName) throws Exception {
+    return client.getDatabase(dbName);
+  }
+
+  public Table getTable(String dbName, String tableName) throws Exception {
+    return client.getTable(dbName, tableName);
+  }
+
+  public Partition getPartition(String dbName, String tableName, List<String> partValues) throws Exception {
+    return client.getPartition(dbName, tableName, partValues);
+  }
+
   ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) {
     return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
new file mode 100644
index 0000000..cbec3ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+
+public class ReplUtils {
+
+  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+
+  public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
+          throws SemanticException {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+    return TaskFactory.get(replLogWork, conf);
+  }
+
+  public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
+                                               String dumpRoot, HiveConf conf) throws SemanticException {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
+
+    AlterTableDesc alterTblDesc =  new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(
+            StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
+    if (partSpec != null) {
+      alterTblDesc.setPartSpec(partSpec);
+    }
+    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 748d318..97917f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -72,7 +72,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   protected int execute(DriverContext driverContext) {
     try {
       int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx());
+      Context context = new Context(work.dumpDirectory, conf, getHive(),
+              work.sessionStateLineageState, driverContext.getCtx());
       TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
       /*
           for now for simplicity we are doing just one directory ( one database ), come back to use

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index c1a9a62..048727f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
 public class ReplLoadWork implements Serializable {
   final String dbNameToLoadIn;
   final String tableNameToLoadIn;
+  final String dumpDirectory;
   private final BootstrapEventsIterator iterator;
   private final ConstraintEventsIterator constraintsIterator;
   private int loadTaskRunCount = 0;
@@ -50,6 +51,7 @@ public class ReplLoadWork implements Serializable {
       throws IOException {
     this.tableNameToLoadIn = tableNameToLoadIn;
     sessionStateLineageState = lineageState;
+    this.dumpDirectory = dumpDirectory;
     this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
     this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
     this.dbNameToLoadIn = dbNameToLoadIn;

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 537c5e9..c5f2779 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -84,6 +85,11 @@ public class LoadDatabase {
      */
     Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
     parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+
+    // Add the checkpoint key to the Database binding it to current dump directory.
+    // So, if retry using same dump, we shall skip Database object update.
+    parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory);
+
     createDbDesc.setDatabaseProperties(parameters);
     // note that we do not set location - for repl load, we want that auto-created.
     createDbDesc.setIfNotExists(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
index 95f484b..f8f0801 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,8 +66,21 @@ public class TaskTracker {
     updateTaskCount(task, visited);
   }
 
-  public void updateTaskCount(Task<? extends Serializable> task,
-                              List <Task<? extends Serializable>> visited) {
+  // This method is used to traverse the DAG created in tasks list and add the dependent task to
+  // the tail of each task chain.
+  public void addDependentTask(Task<? extends Serializable> dependent) {
+    if (tasks.isEmpty()) {
+      addTask(dependent);
+    } else {
+      DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
+
+      List<Task<? extends Serializable>> visited = new ArrayList<>();
+      updateTaskCount(dependent, visited);
+    }
+  }
+
+  private void updateTaskCount(Task<? extends Serializable> task,
+                               List <Task<? extends Serializable>> visited) {
     numberOfTasks += 1;
     visited.add(task);
     if (task.getChildTasks() != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index df7f30d..870f70a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -55,8 +53,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -112,21 +110,6 @@ public class LoadPartitions {
     }
   }
 
-  private void createTableReplLogTask() throws SemanticException {
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,
-                                            tableDesc.getTableName(), tableDesc.tableType());
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf);
-
-    if (tracker.tasks().isEmpty()) {
-      tracker.addTask(replLogTask);
-    } else {
-      DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
-      List<Task<? extends Serializable>> visited = new ArrayList<>();
-      tracker.updateTaskCount(replLogTask, visited);
-    }
-  }
-
   public TaskTracker tasks() throws SemanticException {
     try {
       /*
@@ -143,7 +126,9 @@ public class LoadPartitions {
           updateReplicationState(initialReplicationState());
           if (!forNewTable().hasReplicationState()) {
             // Add ReplStateLogTask only if no pending table load tasks left for next cycle
-            createTableReplLogTask();
+            Task<? extends Serializable> replLogTask
+                    = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+            tracker.addDependentTask(replLogTask);
           }
           return tracker;
         }
@@ -155,7 +140,9 @@ public class LoadPartitions {
             updateReplicationState(initialReplicationState());
             if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
               // Add ReplStateLogTask only if no pending table load tasks left for next cycle
-              createTableReplLogTask();
+              Task<? extends Serializable> replLogTask
+                      = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+              tracker.addDependentTask(replLogTask);
             }
             return tracker;
           }
@@ -229,8 +216,19 @@ public class LoadPartitions {
 
     Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
 
+    // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for
+    // bootstrap, we skip current partition update.
+    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+            tableDesc,
+            (HashMap<String, String>)partSpec.getPartSpec(),
+            context.dumpDirectory,
+            context.hiveConf
+    );
+
     copyTask.addDependentTask(addPartTask);
     addPartTask.addDependentTask(movePartitionTask);
+    movePartitionTask.addDependentTask(ckptTask);
+
     return copyTask;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index d10ca76..f2b7fa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -27,13 +26,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -52,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -79,21 +75,6 @@ public class LoadTable {
     this.tracker = new TaskTracker(limiter);
   }
 
-  private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException {
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, tableType);
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
-    DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
-    if (tracker.tasks().isEmpty()) {
-      tracker.addTask(replLogTask);
-    } else {
-      DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
-      List<Task<? extends Serializable>> visited = new ArrayList<>();
-      tracker.updateTaskCount(replLogTask, visited);
-    }
-  }
-
   public TaskTracker tasks() throws SemanticException {
     // Path being passed to us is a table dump location. We go ahead and load it in as needed.
     // If tblName is null, then we default to the table name specified in _metadata, which is good.
@@ -159,9 +140,20 @@ public class LoadTable {
         existingTableTasks(tableDesc, table, replicationSpec);
       }
 
+      // Set Checkpoint task as dependant to create table task. So, if same dump is retried for
+      // bootstrap, we skip current table update.
+      Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+              tableDesc,
+              null,
+              context.dumpDirectory,
+              context.hiveConf
+      );
       if (!isPartitioned(tableDesc)) {
-        createTableReplLogTask(tableDesc.getTableName(), tableDesc.tableType());
+        Task<? extends Serializable> replLogTask
+                = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+        ckptTask.addDependentTask(replLogTask);
       }
+      tracker.addDependentTask(ckptTask);
       return tracker;
     } catch (Exception e) {
       throw new SemanticException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
index 7eae1ea..b90da06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.session.LineageState;
 
 public class Context {
+  public final String dumpDirectory;
   public final HiveConf hiveConf;
   public final Hive hiveDb;
   public final Warehouse warehouse;
@@ -38,9 +39,10 @@ public class Context {
   public final LineageState sessionStateLineageState;
 
 
-  public Context(HiveConf hiveConf, Hive hiveDb,
+  public Context(String dumpDirectory, HiveConf hiveConf, Hive hiveDb,
       LineageState lineageState,
       org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException {
+    this.dumpDirectory = dumpDirectory;
     this.hiveConf = hiveConf;
     this.hiveDb = hiveDb;
     this.warehouse = new Warehouse(hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 21b9865..a2b5897 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1136,11 +1136,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
 
-        // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here
-        String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
-                + " and t2w_table = " + quoteString(tblName);
-        LOG.debug("Going to execute delete <" + sql + ">");
-        stmt.executeUpdate(sql);
+        // Check if this txn state is already replicated for this given table. If yes, then it is
+        // idempotent case and just return.
+        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+                        + " and nwi_table = " + quoteString(tblName);
+        LOG.debug("Going to execute query <" + sql + ">");
+
+        rs = stmt.executeQuery(sql);
+        if (rs.next()) {
+          LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: "
+                  + dbName + "." + tblName);
+          rollbackDBConn(dbConn);
+          return;
+        }
 
         if (numAbortedWrites > 0) {
           // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
@@ -1173,30 +1181,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         // There are some txns in the list which has no write id allocated and hence go ahead and do it.
         // Get the next write id for the given table and update it with new next write id.
-        // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
-        sql = sqlGenerator.addForUpdateClause(
-                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName));
-        LOG.debug("Going to execute query <" + sql + ">");
-
+        // It is expected NEXT_WRITE_ID doesn't have entry for this table and hence directly insert it.
         long nextWriteId = validWriteIdList.getHighWatermark() + 1;
-        rs = stmt.executeQuery(sql);
-        if (!rs.next()) {
-          // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
-          sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + ","
-                  + Long.toString(nextWriteId) + ")";
-          LOG.debug("Going to execute insert <" + sql + ">");
-          stmt.execute(sql);
-        } else {
-          // Update the NEXT_WRITE_ID for the given table with hwm+1 from source
-          sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId)
-                  + " where nwi_database = " + quoteString(dbName)
-                  + " and nwi_table = " + quoteString(tblName);
-          LOG.debug("Going to execute update <" + sql + ">");
-          stmt.executeUpdate(sql);
-        }
 
+        // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
+        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+                + quoteString(dbName) + "," + quoteString(tblName) + ","
+                + Long.toString(nextWriteId) + ")";
+        LOG.debug("Going to execute insert <" + sql + ">");
+        stmt.execute(sql);
+
+        LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName);
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {