You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/06/28 04:19:31 UTC

[1/2] hive git commit: HIVE-19829: Incremental replication load should create tasks in execution phase rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/master 6a8f4cbe3 -> 67b0a67c3


http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
new file mode 100644
index 0000000..4b37c8d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * IncrementalLoadEventsIterator
+ * Helper class to iterate through event dump directory.
+ */
+public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
+  private FileStatus[] eventDirs;
+  private int currentIndex;
+  private int numEvents;
+
+  public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException {
+    Path eventPath = new Path(loadPath);
+    FileSystem fs = eventPath.getFileSystem(conf);
+    eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs));
+    if ((eventDirs == null) || (eventDirs.length == 0)) {
+      throw new IllegalArgumentException("No data to load in path " + loadPath);
+    }
+    // For event dump, each sub-dir is an individual event dump.
+    // We need to guarantee that the directory listing we got is in order of event id.
+    Arrays.sort(eventDirs, new EventDumpDirComparator());
+    currentIndex = 0;
+    numEvents = eventDirs.length;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (eventDirs != null && currentIndex < numEvents);
+  }
+
+  @Override
+  public FileStatus next() {
+    if (hasNext()) {
+      return eventDirs[currentIndex++];
+    } else {
+      throw new NoSuchElementException("no more events");
+    }
+  }
+
+  public int getNumEvents() {
+    return numEvents;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
new file mode 100644
index 0000000..9e0ce82
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+
+/**
+ * IncrementalLoad
+ * Iterate through the dump directory and create tasks to load the events.
+ */
+public class IncrementalLoadTasksBuilder {
+  private final String dbName, tableName;
+  private final IncrementalLoadEventsIterator iterator;
+  private HashSet<ReadEntity> inputs;
+  private HashSet<WriteEntity> outputs;
+  private Logger log;
+  private final HiveConf conf;
+  private final ReplLogger replLogger;
+  private static long numIteration;
+
+  public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath,
+                                     IncrementalLoadEventsIterator iterator, HiveConf conf) {
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.iterator = iterator;
+    inputs = new HashSet<>();
+    outputs = new HashSet<>();
+    log = null;
+    this.conf = conf;
+    replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents());
+    numIteration = 0;
+    replLogger.startLog();
+  }
+
+  public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log) throws Exception {
+    Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
+    Task<? extends Serializable> taskChainTail = evTaskRoot;
+    Long lastReplayedEvent = null;
+    this.log = log;
+    numIteration++;
+    this.log.debug("Iteration num " + numIteration);
+    TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
+
+    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+      FileStatus dir = iterator.next();
+      String location = dir.getPath().toUri().toString();
+      DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf);
+
+      if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) {
+        this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}",
+                eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks());
+        continue;
+      }
+
+      this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}",
+              eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks());
+
+      // event loads will behave similar to table loads, with one crucial difference
+      // precursor order is strict, and each event must be processed after the previous one.
+      // The way we handle this strict order is as follows:
+      // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
+      // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
+      // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
+      // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
+      // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
+      // tasks as follows:
+      //
+      //                 --->ev1.task1--                          --->ev2.task1--
+      //                /               \                        /               \
+      //  evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
+      //                \               /
+      //                 --->ev1.task3--
+      //
+      // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
+      // entire chain
+
+      MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location,
+              taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log);
+      List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context);
+
+      if ((evTasks != null) && (!evTasks.isEmpty())) {
+        ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
+                dir.getPath().getName(),
+                eventDmd.getDumpType().toString());
+        Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork);
+        AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask);
+        DAGTraversal.traverse(evTasks, function);
+        this.log.debug("Updated taskChainTail from {}:{} to {}:{}",
+                taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
+        tracker.addTaskList(taskChainTail.getChildTasks());
+        taskChainTail = barrierTask;
+      }
+      lastReplayedEvent = eventDmd.getEventTo();
+    }
+
+    // If any event is there and db name is known, then dump the start and end logs
+    if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) {
+      Map<String, String> dbProps = new HashMap<>();
+      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
+      ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);
+      Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf);
+      taskChainTail.addDependentTask(barrierTask);
+      this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+              taskChainTail.getClass(), taskChainTail.getId(),
+              barrierTask.getClass(), barrierTask.getId());
+    }
+    return evTaskRoot;
+  }
+
+  private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
+    if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+      String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+      if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
+        log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
+                + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) {
+    // if database itself is null then we can not filter out anything.
+    if (dbName == null || dbName.isEmpty()) {
+      return true;
+    } else if ((tableName == null) || (tableName.isEmpty())) {
+      Database database;
+      try {
+        database = Hive.get().getDatabase(dbName);
+        return isEventNotReplayed(database.getParameters(), dir, dumpType);
+      } catch (HiveException e) {
+        //may be the db is getting created in this load
+        log.debug("failed to get the database " + dbName);
+        return true;
+      }
+    } else {
+      Table tbl;
+      try {
+        tbl = Hive.get().getTable(dbName, tableName);
+        return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
+      } catch (HiveException e) {
+        // may be the table is getting created in this load
+        log.debug("failed to get the table " + dbName + "." + tableName);
+        return true;
+      }
+    }
+  }
+
+  private List<Task<? extends Serializable>> analyzeEventLoad(MessageHandler.Context context) throws SemanticException {
+    MessageHandler messageHandler = context.dmd.getDumpType().handler();
+    List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
+
+    if (context.precursor != null) {
+      for (Task<? extends Serializable> t : tasks) {
+        context.precursor.addDependentTask(t);
+        log.debug("Added {}:{} as a precursor of {}:{}",
+                context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId());
+      }
+    }
+
+    inputs.addAll(messageHandler.readEntities());
+    outputs.addAll(messageHandler.writeEntities());
+    return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks);
+  }
+
+  private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, String tableName,
+                                                    Map<String, String> partSpec, String replState,
+                                                    Task<? extends Serializable> preCursor) throws SemanticException {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterTableDesc alterTblDesc =  new AlterTableDesc(
+            AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName));
+    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private Task<? extends Serializable> dbUpdateReplStateTask(String dbName, String replState,
+                                                             Task<? extends Serializable> preCursor) {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(replState, replState));
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private List<Task<? extends Serializable>> addUpdateReplStateTasks(boolean isDatabaseLoad,
+                                           UpdatedMetaDataTracker updatedMetadata,
+                                           List<Task<? extends Serializable>> importTasks) throws SemanticException {
+    String replState = updatedMetadata.getReplicationState();
+    String database = updatedMetadata.getDatabase();
+    String table = updatedMetadata.getTable();
+
+    // If no import tasks generated by the event or no table updated for table level load, then no
+    // need to update the repl state to any object.
+    if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) {
+      log.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+      return importTasks;
+    }
+
+    // Create a barrier task for dependency collection of import tasks
+    Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
+
+    // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+    for (Task<? extends Serializable> t : importTasks){
+      t.addDependentTask(barrierTask);
+      log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+              t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+    }
+
+    List<Task<? extends Serializable>> tasks = new ArrayList<>();
+    Task<? extends Serializable> updateReplIdTask;
+
+    // If any partition is updated, then update repl state in partition object
+    for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
+      updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    if (table != null) {
+      // If any table/partition is updated, then update repl state in table object
+      updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    // For table level load, need not update replication state for the database
+    if (isDatabaseLoad) {
+      // If any table/partition is updated, then update repl state in db object
+      updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    // At least one task would have been added to update the repl state
+    return tasks;
+  }
+
+  public static long getNumIteration() {
+    return numIteration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
new file mode 100644
index 0000000..284796f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class AddDependencyToLeaves implements DAGTraversal.Function {
+  private List<Task<? extends Serializable>> postDependencyCollectionTasks;
+
+  public AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) {
+    this.postDependencyCollectionTasks = postDependencyCollectionTasks;
+  }
+
+  public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) {
+    this(Collections.singletonList(postDependencyTask));
+  }
+
+
+  @Override
+  public void process(Task<? extends Serializable> task) {
+    if (task.getChildTasks() == null) {
+      postDependencyCollectionTasks.forEach(task::addDependentTask);
+    }
+  }
+
+  @Override
+  public boolean skipProcessing(Task<? extends Serializable> task) {
+    return postDependencyCollectionTasks.contains(task);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
new file mode 100644
index 0000000..618be1d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+
+public class ReplUtils {
+
+  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+
+  /**
+   * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
+   */
+  public enum ReplLoadOpType {
+    LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
+  }
+
+  public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
+          Table table, List<Map<String, String>> partitions) throws SemanticException {
+    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+    int partPrefixLength = 0;
+    if (partitions.size() > 0) {
+      partPrefixLength = partitions.get(0).size();
+      // pick the length of the first ptn, we expect all ptns listed to have the same number of
+      // key-vals.
+    }
+    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+    for (Map<String, String> ptn : partitions) {
+      // convert each key-value-map to appropriate expression.
+      ExprNodeGenericFuncDesc expr = null;
+      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+        String key = kvp.getKey();
+        Object val = kvp.getValue();
+        String type = table.getPartColByName(key).getType();
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+                "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
+        expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+      }
+      if (expr != null) {
+        partitionDesc.add(expr);
+      }
+    }
+    if (partitionDesc.size() > 0) {
+      partSpecs.put(partPrefixLength, partitionDesc);
+    }
+    return partSpecs;
+  }
+
+  public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
+          throws SemanticException {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+    return TaskFactory.get(replLogWork, conf);
+  }
+
+  public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
+                                               String dumpRoot, HiveConf conf) throws SemanticException {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
+
+    AlterTableDesc alterTblDesc =  new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(
+            StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
+    if (partSpec != null) {
+      alterTblDesc.setPartSpec(partSpec);
+    }
+    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
+  }
+
+  public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot)
+          throws InvalidOperationException {
+    // If ckpt property not set or empty means, bootstrap is not run on this object.
+    if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
+      if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
+        return true;
+      }
+      throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
+              + " is not allowed as the target DB: " + dbName
+              + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY));
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
new file mode 100644
index 0000000..1d01bc9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class will be responsible to track how many tasks have been created,
+ * organization of tasks such that after the number of tasks for next execution are created
+ * we create a dependency collection task(DCT) -> another bootstrap task,
+ * and then add DCT as dependent to all existing tasks that are created so the cycle can continue.
+ */
+public class TaskTracker {
+  private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
+  /**
+   * used to identify the list of tasks at root level for a given level like table /  db / partition.
+   * this does not include the task dependency notion of "table tasks < ---- partition task"
+   */
+  private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
+  private ReplicationState replicationState = null;
+  // since tasks themselves can be graphs we want to limit the number of created
+  // tasks including all of dependencies.
+  private int numberOfTasks = 0;
+  private final int maxTasksAllowed;
+
+  public TaskTracker(int defaultMaxTasks) {
+    maxTasksAllowed = defaultMaxTasks;
+  }
+
+  public TaskTracker(TaskTracker existing) {
+    maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
+  }
+
+  /**
+   * this method is used to identify all the tasks in a graph.
+   * the graph however might get created in a disjoint fashion, in which case we can just update
+   * the number of tasks using the "update" method.
+   */
+  public void addTask(Task<? extends Serializable> task) {
+    tasks.add(task);
+
+    List <Task<? extends Serializable>> visited = new ArrayList<>();
+    updateTaskCount(task, visited);
+  }
+
+  public void addTaskList(List <Task<? extends Serializable>> taskList) {
+    List <Task<? extends Serializable>> visited = new ArrayList<>();
+    for (Task<? extends Serializable> task : taskList) {
+      if (!visited.contains(task)) {
+        tasks.add(task);
+        updateTaskCount(task, visited);
+      }
+    }
+  }
+
+  // This method is used to traverse the DAG created in tasks list and add the dependent task to
+  // the tail of each task chain.
+  public void addDependentTask(Task<? extends Serializable> dependent) {
+    if (tasks.isEmpty()) {
+      addTask(dependent);
+    } else {
+      DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
+
+      List<Task<? extends Serializable>> visited = new ArrayList<>();
+      updateTaskCount(dependent, visited);
+    }
+  }
+
+  private void updateTaskCount(Task<? extends Serializable> task,
+                               List <Task<? extends Serializable>> visited) {
+    numberOfTasks += 1;
+    visited.add(task);
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        if (visited.contains(childTask)) {
+          continue;
+        }
+        updateTaskCount(childTask, visited);
+      }
+    }
+  }
+
+  public boolean canAddMoreTasks() {
+    return numberOfTasks < maxTasksAllowed;
+  }
+
+  public boolean hasTasks() {
+    return numberOfTasks != 0;
+  }
+
+  public void update(TaskTracker withAnother) {
+    numberOfTasks += withAnother.numberOfTasks;
+    if (withAnother.hasReplicationState()) {
+      this.replicationState = withAnother.replicationState;
+    }
+  }
+
+  public void setReplicationState(ReplicationState state) {
+    this.replicationState = state;
+  }
+
+  public boolean hasReplicationState() {
+    return replicationState != null;
+  }
+
+  public ReplicationState replicationState() {
+    return replicationState;
+  }
+
+  public List<Task<? extends Serializable>> tasks() {
+    return tasks;
+  }
+
+  public void debugLog(String forEventType) {
+    LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks,
+        tasks.size());
+  }
+
+  public int numberOfTasks() {
+    return numberOfTasks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
index a91f45e..cf54aa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
 import org.apache.hadoop.hive.ql.parse.GenTezWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 0a5ecf9..0a535d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 5aeae16..6ed792c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,33 +29,17 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
-import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
-import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
-import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
-import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 import java.io.FileNotFoundException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -265,45 +248,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
-    if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-      String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-      if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
-        LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
-                + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
-    // if database itself is null then we can not filter out anything.
-    if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
-      return true;
-    } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) {
-      Database database;
-      try {
-        database = Hive.get().getDatabase(dbNameOrPattern);
-        return isEventNotReplayed(database.getParameters(), dir, dumpType);
-      } catch (HiveException e) {
-        //may be the db is getting created in this load
-        LOG.debug("failed to get the database " + dbNameOrPattern);
-        return true;
-      }
-    } else {
-      Table tbl;
-      try {
-        tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
-        return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
-      } catch (HiveException e) {
-        // may be the table is getting created in this load
-        LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
-        return true;
-      }
-    }
-  }
-
   /*
    * Example dump dirs we need to be able to handle :
    *
@@ -395,7 +339,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
-                tblNameOrPattern, queryState.getLineageState());
+                tblNameOrPattern, queryState.getLineageState(), false);
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         return;
       }
@@ -406,236 +350,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         return;
       }
 
-      FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
-
-      if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
-        throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
-      }
-
-      if (!evDump){
-        // not an event dump, not a table dump - thus, a db dump
-        if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
-          LOG.debug("Found multiple dirs when we expected 1:");
-          for (FileStatus d : dirsInLoadPath) {
-            LOG.debug("> " + d.getPath().toUri().toString());
-          }
-          throw new IllegalArgumentException(
-              "Multiple dirs in "
-                  + loadPath.toUri().toString()
-                  + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
-        }
-
-        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
-                queryState.getLineageState());
-        rootTasks.add(TaskFactory.get(replLoadWork, conf));
-        //
-        //        for (FileStatus dir : dirsInLoadPath) {
-        //          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
-        //        }
-      } else {
-        // Event dump, each sub-dir is an individual event dump.
-        // We need to guarantee that the directory listing we got is in order of evid.
-        Arrays.sort(dirsInLoadPath, new EventDumpDirComparator());
-
-        Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
-        Task<? extends Serializable> taskChainTail = evTaskRoot;
-
-        ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern,
-                loadPath.toString(), dirsInLoadPath.length);
-
-        for (FileStatus dir : dirsInLoadPath){
-          String locn = dir.getPath().toUri().toString();
-          DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
-
-          if (!shouldReplayEvent(dir, eventDmd.getDumpType())) {
-            continue;
-          }
-
-          LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
-
-          // event loads will behave similar to table loads, with one crucial difference
-          // precursor order is strict, and each event must be processed after the previous one.
-          // The way we handle this strict order is as follows:
-          // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
-          // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
-          // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
-          // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
-          // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
-          // tasks as follows:
-          //
-          //                 --->ev1.task1--                          --->ev2.task1--
-          //                /               \                        /               \
-          //  evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
-          //                \               /
-          //                 --->ev1.task3--
-          //
-          // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
-          // entire chain
-
-          MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern,
-                                                          tblNameOrPattern, locn, taskChainTail,
-                                                          eventDmd, conf, db, ctx, LOG);
-          List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context);
-
-          if ((evTasks != null) && (!evTasks.isEmpty())){
-            ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
-                                                          dir.getPath().getName(),
-                                                          eventDmd.getDumpType().toString());
-            Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork);
-            for (Task<? extends Serializable> t : evTasks){
-              t.addDependentTask(barrierTask);
-              LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-                  t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
-            }
-            LOG.debug("Updated taskChainTail from {}:{} to {}:{}",
-                taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
-            taskChainTail = barrierTask;
-          }
-        }
-
-        // If any event is there and db name is known, then dump the start and end logs
-        if (!evTaskRoot.equals(taskChainTail)) {
-          Map<String, String> dbProps = new HashMap<>();
-          dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo()));
-          ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);
-          Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf);
-          taskChainTail.addDependentTask(barrierTask);
-          LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-                  taskChainTail.getClass(), taskChainTail.getId(),
-                  barrierTask.getClass(), barrierTask.getId());
-
-          replLogger.startLog();
-        }
-        rootTasks.add(evTaskRoot);
-      }
-
+      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
+              tblNameOrPattern, queryState.getLineageState(), evDump);
+      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
-  private List<Task<? extends Serializable>> analyzeEventLoad(
-          MessageHandler.Context context)
-      throws SemanticException {
-    MessageHandler messageHandler = context.dmd.getDumpType().handler();
-    List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
-
-    if (context.precursor != null) {
-      for (Task<? extends Serializable> t : tasks) {
-        context.precursor.addDependentTask(t);
-        LOG.debug("Added {}:{} as a precursor of {}:{}",
-                context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId());
-      }
-    }
-
-    inputs.addAll(messageHandler.readEntities());
-    outputs.addAll(messageHandler.writeEntities());
-    return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName),
-                            messageHandler.getUpdatedMetadata(), tasks);
-  }
-
-  private Task<? extends Serializable> tableUpdateReplStateTask(
-                                                        String dbName,
-                                                        String tableName,
-                                                        Map<String, String> partSpec,
-                                                        String replState,
-                                                        Task<? extends Serializable> preCursor) throws SemanticException {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
-    AlterTableDesc alterTblDesc =  new AlterTableDesc(
-            AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
-    alterTblDesc.setProps(mapProp);
-    alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName));
-    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
-
-    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-        new DDLWork(inputs, outputs, alterTblDesc), conf);
-
-    // Link the update repl state task with dependency collection task
-    if (preCursor != null) {
-      preCursor.addDependentTask(updateReplIdTask);
-      LOG.debug("Added {}:{} as a precursor of {}:{}",
-              preCursor.getClass(), preCursor.getId(),
-              updateReplIdTask.getClass(), updateReplIdTask.getId());
-    }
-    return updateReplIdTask;
-  }
-
-  private Task<? extends Serializable> dbUpdateReplStateTask(
-                                                        String dbName,
-                                                        String replState,
-                                                        Task<? extends Serializable> preCursor) {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
-    AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(
-                            dbName, mapProp, new ReplicationSpec(replState, replState));
-    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-        new DDLWork(inputs, outputs, alterDbDesc), conf);
-
-    // Link the update repl state task with dependency collection task
-    if (preCursor != null) {
-      preCursor.addDependentTask(updateReplIdTask);
-      LOG.debug("Added {}:{} as a precursor of {}:{}",
-              preCursor.getClass(), preCursor.getId(),
-              updateReplIdTask.getClass(), updateReplIdTask.getId());
-    }
-    return updateReplIdTask;
-  }
-
-  private List<Task<? extends Serializable>> addUpdateReplStateTasks(
-          boolean isDatabaseLoad,
-          UpdatedMetaDataTracker updatedMetadata,
-          List<Task<? extends Serializable>> importTasks) throws SemanticException {
-    String replState = updatedMetadata.getReplicationState();
-    String dbName = updatedMetadata.getDatabase();
-    String tableName = updatedMetadata.getTable();
-
-    // If no import tasks generated by the event or no table updated for table level load, then no
-    // need to update the repl state to any object.
-    if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) {
-      LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load");
-      return importTasks;
-    }
-
-    // Create a barrier task for dependency collection of import tasks
-    Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
-
-    // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
-    for (Task<? extends Serializable> t : importTasks){
-      t.addDependentTask(barrierTask);
-      LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-              t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
-    }
-
-    List<Task<? extends Serializable>> tasks = new ArrayList<>();
-    Task<? extends Serializable> updateReplIdTask;
-
-    // If any partition is updated, then update repl state in partition object
-    for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
-      updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
-      tasks.add(updateReplIdTask);
-    }
-
-    if (tableName != null) {
-      // If any table/partition is updated, then update repl state in table object
-      updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
-      tasks.add(updateReplIdTask);
-    }
-
-    // For table level load, need not update replication state for the database
-    if (isDatabaseLoad) {
-      // If any table/partition is updated, then update repl state in db object
-      updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
-      tasks.add(updateReplIdTask);
-    }
-
-    // At least one task would have been added to update the repl state
-    return tasks;
-  }
-
   // REPL STATUS
   private void initReplStatus(ASTNode ast) throws SemanticException{
     dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 9fdf742..ecde3ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.thrift.TException;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 70f4fed..f05c231 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index b59cdf2..e68e055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 939884d..4a2fdd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DDLWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
index 309debe..166cf87 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
index 32b4a72..41ab447 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;


[2/2] hive git commit: HIVE-19829: Incremental replication load should create tasks in execution phase rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Posted by sa...@apache.org.
HIVE-19829: Incremental replication load should create tasks in execution phase rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/67b0a67c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/67b0a67c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/67b0a67c

Branch: refs/heads/master
Commit: 67b0a67c324bd78224e0b8338eda63824d104cdc
Parents: 6a8f4cb
Author: Sankar Hariappan <sa...@apache.org>
Authored: Thu Jun 28 09:48:47 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Thu Jun 28 09:48:47 2018 +0530

----------------------------------------------------------------------
 .../TestReplicationScenariosAcidTables.java     |  12 +-
 ...TestReplicationScenariosAcrossInstances.java | 112 +++++-
 ql/if/queryplan.thrift                          |   3 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |   8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |   3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |   5 +-
 ql/src/gen/thrift/gen-php/Types.php             |   2 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |   3 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |   5 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   4 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java  | 344 +++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java  | 113 ++++++
 .../hadoop/hive/ql/exec/repl/ReplUtils.java     | 123 -------
 .../repl/bootstrap/AddDependencyToLeaves.java   |  51 ---
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    | 319 -----------------
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |  88 -----
 .../filesystem/BootstrapEventsIterator.java     |   9 +
 .../repl/bootstrap/load/LoadConstraint.java     |   1 +
 .../exec/repl/bootstrap/load/LoadDatabase.java  |   5 +-
 .../exec/repl/bootstrap/load/LoadFunction.java  |   3 +-
 .../exec/repl/bootstrap/load/TaskTracker.java   | 135 --------
 .../bootstrap/load/table/LoadPartitions.java    |   6 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   6 +-
 .../repl/bootstrap/load/table/TableContext.java |   2 +-
 .../IncrementalLoadEventsIterator.java          |  73 ++++
 .../IncrementalLoadTasksBuilder.java            | 311 +++++++++++++++++
 .../exec/repl/util/AddDependencyToLeaves.java   |  51 +++
 .../hive/ql/exec/repl/util/ReplUtils.java       | 124 +++++++
 .../hive/ql/exec/repl/util/TaskTracker.java     | 145 ++++++++
 .../ql/optimizer/QueryPlanPostProcessor.java    |   2 +-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   |   2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 287 +---------------
 .../parse/repl/dump/io/PartitionSerializer.java |   2 +-
 .../ql/parse/repl/dump/io/TableSerializer.java  |   2 +-
 .../repl/load/message/AlterDatabaseHandler.java |   2 +-
 .../repl/load/message/DropPartitionHandler.java |   2 +-
 .../bootstrap/AddDependencyToLeavesTest.java    |   1 +
 .../repl/bootstrap/load/TestTaskTracker.java    |   1 +
 38 files changed, 1316 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index a1498ca..4892486 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -345,7 +345,7 @@ public class TestReplicationScenariosAcidTables {
 
     WarehouseInstance.Tuple incrementalDump =
             primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
-    replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation)
+    replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'")
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
   }
@@ -395,10 +395,8 @@ public class TestReplicationScenariosAcidTables {
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult("null")
-            .run("show tables")
-            .verifyResults(new String[] { "t1" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("1"));
+            .run("show tables like t2")
+            .verifyResults(new String[] { });
 
     // Retry with different dump should fail.
     replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
@@ -413,10 +411,6 @@ public class TestReplicationScenariosAcidTables {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
           return false;
         }
-        if (args.tblName != null) {
-          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
-          return args.tblName.equals("t2");
-        }
         return true;
       }
     };

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 0f67174..d0608cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
 import org.apache.hadoop.hive.shims.Utils;
@@ -45,6 +45,8 @@ import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.junit.Assert;
 
 import java.io.IOException;
 import java.net.URI;
@@ -77,10 +79,11 @@ public class TestReplicationScenariosAcrossInstances {
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   private static WarehouseInstance primary, replica;
   private String primaryDbName, replicatedDbName;
+  private static HiveConf conf;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    Configuration conf = new Configuration();
+    conf = new HiveConf(TestReplicationScenarios.class);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
@@ -876,6 +879,91 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
+  public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (10)")
+            .run("create table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='uk') values ('london')")
+            .run("insert into table t2 partition(country='us') values ('sfo')")
+            .dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select country from t2")
+            .verifyResults(Arrays.asList("india", "uk", "us"));
+
+    Database db = replica.getDatabase(replicatedDbName);
+    verifyIfCkptSet(db.getParameters(), tuple.dumpLocation);
+    Table t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation);
+    Table t2 = replica.getTable(replicatedDbName, "t2");
+    verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation);
+    Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tuple.dumpLocation);
+    Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us"));
+    verifyIfCkptSet(us.getParameters(), tuple.dumpLocation);
+    Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk"));
+    verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
+  }
+
+  @Test
+  public void testIncrementalDumpMultiIteration() throws Throwable {
+    WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapTuple.lastReplicationId);
+
+    WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
+            .run("create table table1 (id int) partitioned by (country string)")
+            .run("create table table2 (id int)")
+            .run("create table table3 (id int) partitioned by (country string)")
+            .run("insert into table1 partition(country='india') values(1)")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 partition(country='india') values(3)")
+            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'"))
+            .status(replicatedDbName)
+            .verifyResult(incremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResults(new String[] {"1" })
+            .run("select * from table2")
+            .verifyResults(new String[] {"2" })
+            .run("select id from table3")
+            .verifyResults(new String[] {"3" });
+    assert(IncrementalLoadTasksBuilder.getNumIteration() > 1);
+
+    incremental = primary.run("use " + primaryDbName)
+            .run("create table  table5 (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc")
+            .run("create table table4 (i int, j int)")
+            .run("insert into table4 values (1,2)")
+            .dump(primaryDbName, incremental.lastReplicationId);
+
+    Path path = new Path(incremental.dumpLocation);
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] fileStatus = fs.listStatus(path);
+    int numEvents = fileStatus.length - 1; //one is metadata file
+
+    replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" })
+            .run("select i from table4")
+            .verifyResult("1");
+    Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents);
+  }
+
+  @Test
   public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable {
     WarehouseInstance.Tuple tuplePrimary = primary
             .run("use " + primaryDbName)
@@ -1087,9 +1175,7 @@ public class TestReplicationScenariosAcrossInstances {
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
-            .verifyResult("null")
-            .run("show tables")
-            .verifyResults(new String[] { "t1" });
+            .verifyResult("null");
     assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
     assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size());
     assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size());
@@ -1109,10 +1195,6 @@ public class TestReplicationScenariosAcrossInstances {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName));
           return false;
         }
-        if (args.tblName != null) {
-          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
-          return (args.tblName.equals("t2") || args.tblName.equals("t3"));
-        }
         if (args.constraintTblName != null) {
           LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName));
           return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3"));
@@ -1179,8 +1261,6 @@ public class TestReplicationScenariosAcrossInstances {
   public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable {
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
-            .run("create table t1 (id int)")
-            .run("insert into table t1 values (10)")
             .run("create table t2 (place string) partitioned by (country string)")
             .run("insert into table t2 partition(country='india') values ('bangalore')")
             .run("insert into table t2 partition(country='uk') values ('london')")
@@ -1195,7 +1275,7 @@ public class TestReplicationScenariosAcrossInstances {
             .dump(primaryDbName, null);
 
     // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk".
-    // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed.
+    // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed.
     BehaviourInjection<Partition, Partition> getPartitionStub
             = new BehaviourInjection<Partition, Partition>() {
       @Nullable
@@ -1220,9 +1300,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("repl status " + replicatedDbName)
             .verifyResult("null")
             .run("show tables")
-            .verifyResults(new String[] { "t1", "t2" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("10"))
+            .verifyResults(new String[] {"t2" })
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india"))
             .run("show functions like '" + replicatedDbName + "*'")
@@ -1259,9 +1337,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("show tables")
-            .verifyResults(new String[] { "t1", "t2" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("10"))
+            .verifyResults(new String[] { "t2" })
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india", "uk", "us"))
             .run("show functions like '" + replicatedDbName + "*'")

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index ad778e3..d43eed3 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -103,7 +103,8 @@ enum StageType {
   REPL_DUMP,
   REPL_BOOTSTRAP_LOAD,
   REPL_STATE_LOG,
-  REPL_TXN
+  REPL_TXN,
+  REPL_INCREMENTAL_LOAD
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index b6eb12a..73bbe3a 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -119,7 +119,8 @@ int _kStageTypeValues[] = {
   StageType::REPL_DUMP,
   StageType::REPL_BOOTSTRAP_LOAD,
   StageType::REPL_STATE_LOG,
-  StageType::REPL_TXN
+  StageType::REPL_TXN,
+  StageType::REPL_INCREMENTAL_LOAD
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -137,9 +138,10 @@ const char* _kStageTypeNames[] = {
   "REPL_DUMP",
   "REPL_BOOTSTRAP_LOAD",
   "REPL_STATE_LOG",
-  "REPL_TXN"
+  "REPL_TXN",
+  "REPL_INCREMENTAL_LOAD"
 };
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index eb02107..04c749f 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -97,7 +97,8 @@ struct StageType {
     REPL_DUMP = 12,
     REPL_BOOTSTRAP_LOAD = 13,
     REPL_STATE_LOG = 14,
-    REPL_TXN = 15
+    REPL_TXN = 15,
+    REPL_INCREMENTAL_LOAD = 16
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 08822b3..7eebe28 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -27,7 +27,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   REPL_DUMP(12),
   REPL_BOOTSTRAP_LOAD(13),
   REPL_STATE_LOG(14),
-  REPL_TXN(15);
+  REPL_TXN(15),
+  REPL_INCREMENTAL_LOAD(16);
 
   private final int value;
 
@@ -80,6 +81,8 @@ public enum StageType implements org.apache.thrift.TEnum {
         return REPL_STATE_LOG;
       case 15:
         return REPL_TXN;
+      case 16:
+        return REPL_INCREMENTAL_LOAD;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index df4e41d..1a36d08 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -118,6 +118,7 @@ final class StageType {
   const REPL_BOOTSTRAP_LOAD = 13;
   const REPL_STATE_LOG = 14;
   const REPL_TXN = 15;
+  const REPL_INCREMENTAL_LOAD = 16;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -135,6 +136,7 @@ final class StageType {
     13 => 'REPL_BOOTSTRAP_LOAD',
     14 => 'REPL_STATE_LOG',
     15 => 'REPL_TXN',
+    16 => 'REPL_INCREMENTAL_LOAD',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 85d39fd..c0a2204 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -164,6 +164,7 @@ class StageType:
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
   REPL_TXN = 15
+  REPL_INCREMENTAL_LOAD = 16
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -182,6 +183,7 @@ class StageType:
     13: "REPL_BOOTSTRAP_LOAD",
     14: "REPL_STATE_LOG",
     15: "REPL_TXN",
+    16: "REPL_INCREMENTAL_LOAD",
   }
 
   _NAMES_TO_VALUES = {
@@ -201,6 +203,7 @@ class StageType:
     "REPL_BOOTSTRAP_LOAD": 13,
     "REPL_STATE_LOG": 14,
     "REPL_TXN": 15,
+    "REPL_INCREMENTAL_LOAD": 16,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 6010f3d..61349a2 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -76,8 +76,9 @@ module StageType
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
   REPL_TXN = 15
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze
+  REPL_INCREMENTAL_LOAD = 16
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 3a107b7..47a802f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
new file mode 100644
index 0000000..76ba975
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+
+public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
+  private final static int ZERO_TASKS = 0;
+
+  @Override
+  public String getName() {
+    return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD");
+  }
+
+  /**
+   * Provides the root Tasks created as a result of this loadTask run which will be executed
+   * by the driver. It does not track details across multiple runs of LoadTask.
+   */
+  private static class Scope {
+    boolean database = false, table = false, partition = false;
+    List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
+  }
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    if (work.isIncrementalLoad()) {
+      return executeIncrementalLoad(driverContext);
+    } else {
+      return executeBootStrapLoad(driverContext);
+    }
+  }
+
+  private int executeBootStrapLoad(DriverContext driverContext) {
+    try {
+      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+      Context context = new Context(work.dumpDirectory, conf, getHive(),
+              work.sessionStateLineageState, driverContext.getCtx());
+      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+      /*
+          for now for simplicity we are doing just one directory ( one database ), come back to use
+          of multiple databases once we have the basic flow to chain creating of tasks in place for
+          a database ( directory )
+      */
+      BootstrapEventsIterator iterator = work.iterator();
+      ConstraintEventsIterator constraintIterator = work.constraintIterator();
+      /*
+      This is used to get hold of a reference during the current creation of tasks and is initialized
+      with "0" tasks such that it will be non consequential in any operations done with task tracker
+      compositions.
+       */
+      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+      Scope scope = new Scope();
+      boolean loadingConstraint = false;
+      if (!iterator.hasNext() && constraintIterator.hasNext()) {
+        loadingConstraint = true;
+      }
+      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+        BootstrapEvent next;
+        if (!loadingConstraint) {
+          next = iterator.next();
+        } else {
+          next = constraintIterator.next();
+        }
+        switch (next.eventType()) {
+        case Database:
+          DatabaseEvent dbEvent = (DatabaseEvent) next;
+          dbTracker =
+              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
+                  .tasks();
+          loadTaskTracker.update(dbTracker);
+          if (work.hasDbState()) {
+            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+          }
+          work.updateDbEventState(dbEvent.toState());
+          if (dbTracker.hasTasks()) {
+            scope.rootTasks.addAll(dbTracker.tasks());
+            scope.database = true;
+          }
+          dbTracker.debugLog("database");
+          break;
+        case Table: {
+          /*
+              Implicit assumption here is that database level is processed first before table level,
+              which will depend on the iterator used since it should provide the higher level directory
+              listing before providing the lower level listing. This is also required such that
+              the dbTracker /  tableTracker are setup correctly always.
+           */
+          TableContext tableContext =
+              new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
+          TableEvent tableEvent = (TableEvent) next;
+          LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
+                                              tableContext, loadTaskTracker);
+          tableTracker = loadTable.tasks();
+          setUpDependencies(dbTracker, tableTracker);
+          if (!scope.database && tableTracker.hasTasks()) {
+            scope.rootTasks.addAll(tableTracker.tasks());
+            scope.table = true;
+          }
+          /*
+            for table replication if we reach the max number of tasks then for the next run we will
+            try to reload the same table again, this is mainly for ease of understanding the code
+            as then we can avoid handling == > loading partitions for the table given that
+            the creation of table lead to reaching max tasks vs,  loading next table since current
+            one does not have partitions.
+           */
+
+          // for a table we explicitly try to load partitions as there is no separate partitions events.
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
+                      work.dbNameToLoadIn, tableContext);
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+              partitionsTracker);
+          tableTracker.debugLog("table");
+          partitionsTracker.debugLog("partitions for table");
+          break;
+        }
+        case Partition: {
+          /*
+              This will happen only when loading tables and we reach the limit of number of tasks we can create;
+              hence we know here that the table should exist and there should be a lastPartitionName
+          */
+          PartitionEvent event = (PartitionEvent) next;
+          TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
+              work.tableNameToLoadIn);
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
+                      event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
+          /*
+               the tableTracker here should be a new instance and not an existing one as this can
+               only happen when we break in between loading partitions.
+           */
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+              partitionsTracker);
+          partitionsTracker.debugLog("partitions");
+          break;
+        }
+        case Function: {
+          LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(),
+                                              (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
+          TaskTracker functionsTracker = loadFunction.tasks();
+          if (!scope.database) {
+            scope.rootTasks.addAll(functionsTracker.tasks());
+          } else {
+            setUpDependencies(dbTracker, functionsTracker);
+          }
+          loadTaskTracker.update(functionsTracker);
+          functionsTracker.debugLog("functions");
+          break;
+        }
+        case Constraint: {
+          LoadConstraint loadConstraint =
+              new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
+          TaskTracker constraintTracker = loadConstraint.tasks();
+          scope.rootTasks.addAll(constraintTracker.tasks());
+          loadTaskTracker.update(constraintTracker);
+          constraintTracker.debugLog("constraints");
+        }
+        }
+
+        if (!loadingConstraint && !iterator.currentDbHasNext()) {
+          createEndReplLogTask(context, scope, iterator.replLogger());
+        }
+      }
+      boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
+          || constraintIterator.hasNext();
+      if (addAnotherLoadTask) {
+        createBuilderTask(scope.rootTasks);
+      }
+      if (!iterator.hasNext() && !constraintIterator.hasNext()) {
+        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+        work.updateDbEventState(null);
+      }
+      this.childTasks = scope.rootTasks;
+      /*
+      Since there can be multiple rounds of this run all of which will be tied to the same
+      query id -- generated in compile phase , adding a additional UUID to the end to print each run
+      in separate files.
+       */
+      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+
+      // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later
+      driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
+    }  catch (RuntimeException e) {
+      LOG.error("replication failed with run time exception", e);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("replication failed", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+    LOG.info("completed load task run : {}", work.executedLoadTask());
+    return 0;
+  }
+
+  private void createEndReplLogTask(Context context, Scope scope,
+                                                  ReplLogger replLogger) throws SemanticException {
+    Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters());
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
+    if (scope.rootTasks.isEmpty()) {
+      scope.rootTasks.add(replLogTask);
+    } else {
+      DAGTraversal.traverse(scope.rootTasks,
+          new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
+    }
+  }
+
+  /**
+   * There was a database update done before and we want to make sure we update the last repl
+   * id on this database as we are now going to switch to processing a new database.
+   *
+   * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
+   * is a root level task then in the execution phase the root level tasks will get executed first,
+   * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
+   * the last repl status of the target database will return a valid value, which will not represent
+   * the state of the database.
+   */
+  private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
+      throws SemanticException {
+  /*
+    we don't want to put any limits on this task as this is essential before we start
+    processing new database events.
+   */
+    TaskTracker taskTracker =
+        new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
+            new TaskTracker(maxTasks)).tasks();
+
+    AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks());
+    DAGTraversal.traverse(scope.rootTasks, function);
+
+    return taskTracker;
+  }
+
+  private void partitionsPostProcessing(BootstrapEventsIterator iterator,
+      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+      TaskTracker partitionsTracker) throws SemanticException {
+    setUpDependencies(tableTracker, partitionsTracker);
+    if (!scope.database && !scope.table) {
+      scope.rootTasks.addAll(partitionsTracker.tasks());
+      scope.partition = true;
+    }
+    loadTaskTracker.update(tableTracker);
+    loadTaskTracker.update(partitionsTracker);
+    if (partitionsTracker.hasReplicationState()) {
+      iterator.setReplicationState(partitionsTracker.replicationState());
+    }
+  }
+
+  /*
+      This sets up dependencies such that a child task is dependant on the parent to be complete.
+   */
+  private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
+    if (parentTasks.hasTasks()) {
+      for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+        for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+          parentTask.addDependentTask(childTask);
+        }
+      }
+    } else {
+      for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+        parentTasks.addTask(childTask);
+      }
+    }
+  }
+
+  private void createBuilderTask(List<Task<? extends Serializable>> rootTasks) {
+    // Use loadTask as dependencyCollection
+    Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+    DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
+  }
+
+  @Override
+  public StageType getType() {
+    return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD;
+  }
+
+  private int executeIncrementalLoad(DriverContext driverContext) {
+    try {
+      IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
+      this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG));
+      if (work.getIncrementalIterator().hasNext()) {
+        // attach a load task at the tail of task list to start the next iteration.
+        createBuilderTask(this.childTasks);
+      }
+      return 0;
+    } catch (Exception e) {
+      LOG.error("failed replication", e);
+      setException(e);
+      return 1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
new file mode 100644
index 0000000..8921e94
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.session.LineageState;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class ReplLoadWork implements Serializable {
+  final String dbNameToLoadIn;
+  final String tableNameToLoadIn;
+  final String dumpDirectory;
+  private final transient BootstrapEventsIterator bootstrapIterator;
+  private final ConstraintEventsIterator constraintsIterator;
+  private final transient IncrementalLoadEventsIterator incrementalIterator;
+  private int loadTaskRunCount = 0;
+  private DatabaseEvent.State state = null;
+  private final transient IncrementalLoadTasksBuilder incrementalLoad;
+
+  /*
+  these are sessionState objects that are copied over to work to allow for parallel execution.
+  based on the current use case the methods are selectively synchronized, which might need to be
+  taken care when using other methods.
+  */
+  final LineageState sessionStateLineageState;
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
+      String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException {
+    this.tableNameToLoadIn = tableNameToLoadIn;
+    sessionStateLineageState = lineageState;
+    this.dumpDirectory = dumpDirectory;
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    if (isIncrementalDump) {
+      incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf);
+      this.bootstrapIterator = null;
+      this.constraintsIterator = null;
+      incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
+              incrementalIterator, hiveConf);
+    } else {
+      this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
+      this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
+      incrementalIterator = null;
+      incrementalLoad = null;
+    }
+  }
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
+      LineageState lineageState) throws IOException {
+    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false);
+  }
+
+  public BootstrapEventsIterator iterator() {
+    return bootstrapIterator;
+  }
+
+  public ConstraintEventsIterator constraintIterator() {
+    return constraintsIterator;
+  }
+
+  int executedLoadTask() {
+    return ++loadTaskRunCount;
+  }
+
+  void updateDbEventState(DatabaseEvent.State state) {
+    this.state = state;
+  }
+
+  DatabaseEvent databaseEvent(HiveConf hiveConf) {
+    return state.toEvent(hiveConf);
+  }
+
+  boolean hasDbState() {
+    return state != null;
+  }
+
+  public boolean isIncrementalLoad() {
+    return incrementalIterator != null;
+  }
+
+  public IncrementalLoadEventsIterator getIncrementalIterator() {
+    return incrementalIterator;
+  }
+
+  public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() {
+    return incrementalLoad;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
deleted file mode 100644
index 18a8304..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-
-public class ReplUtils {
-
-  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
-
-  /**
-   * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
-   */
-  public enum ReplLoadOpType {
-    LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
-  }
-
-  public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
-          Table table, List<Map<String, String>> partitions) throws SemanticException {
-    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
-    int partPrefixLength = 0;
-    if (partitions.size() > 0) {
-      partPrefixLength = partitions.get(0).size();
-      // pick the length of the first ptn, we expect all ptns listed to have the same number of
-      // key-vals.
-    }
-    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
-    for (Map<String, String> ptn : partitions) {
-      // convert each key-value-map to appropriate expression.
-      ExprNodeGenericFuncDesc expr = null;
-      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
-        String key = kvp.getKey();
-        Object val = kvp.getValue();
-        String type = table.getPartColByName(key).getType();
-        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
-        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
-        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
-                "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
-        expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
-      }
-      if (expr != null) {
-        partitionDesc.add(expr);
-      }
-    }
-    if (partitionDesc.size() > 0) {
-      partSpecs.put(partPrefixLength, partitionDesc);
-    }
-    return partSpecs;
-  }
-
-  public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
-          throws SemanticException {
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
-    return TaskFactory.get(replLogWork, conf);
-  }
-
-  public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
-                                               String dumpRoot, HiveConf conf) throws SemanticException {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
-
-    AlterTableDesc alterTblDesc =  new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
-    alterTblDesc.setProps(mapProp);
-    alterTblDesc.setOldName(
-            StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
-    if (partSpec != null) {
-      alterTblDesc.setPartSpec(partSpec);
-    }
-    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
-  }
-
-  public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot)
-          throws InvalidOperationException {
-    // If ckpt property not set or empty means, bootstrap is not run on this object.
-    if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
-      if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
-        return true;
-      }
-      throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
-              + " is not allowed as the target DB: " + dbName
-              + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY));
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
deleted file mode 100644
index 0313058..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-public class AddDependencyToLeaves implements DAGTraversal.Function {
-  private List<Task<? extends Serializable>> postDependencyCollectionTasks;
-
-  AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) {
-    this.postDependencyCollectionTasks = postDependencyCollectionTasks;
-  }
-
-  public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) {
-    this(Collections.singletonList(postDependencyTask));
-  }
-
-
-  @Override
-  public void process(Task<? extends Serializable> task) {
-    if (task.getChildTasks() == null) {
-      postDependencyCollectionTasks.forEach(task::addDependentTask);
-    }
-  }
-
-  @Override
-  public boolean skipProcessing(Task<? extends Serializable> task) {
-    return postDependencyCollectionTasks.contains(task);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
deleted file mode 100644
index 50fe3ac..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
-
-public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
-  private final static int ZERO_TASKS = 0;
-
-  @Override
-  public String getName() {
-    return "REPL_BOOTSTRAP_LOAD";
-  }
-
-  /**
-   * Provides the root Tasks created as a result of this loadTask run which will be executed
-   * by the driver. It does not track details across multiple runs of LoadTask.
-   */
-  private static class Scope {
-    boolean database = false, table = false, partition = false;
-    List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
-  }
-
-  @Override
-  protected int execute(DriverContext driverContext) {
-    try {
-      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context context = new Context(work.dumpDirectory, conf, getHive(),
-              work.sessionStateLineageState, driverContext.getCtx());
-      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
-      /*
-          for now for simplicity we are doing just one directory ( one database ), come back to use
-          of multiple databases once we have the basic flow to chain creating of tasks in place for
-          a database ( directory )
-      */
-      BootstrapEventsIterator iterator = work.iterator();
-      ConstraintEventsIterator constraintIterator = work.constraintIterator();
-      /*
-      This is used to get hold of a reference during the current creation of tasks and is initialized
-      with "0" tasks such that it will be non consequential in any operations done with task tracker
-      compositions.
-       */
-      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
-      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
-      Scope scope = new Scope();
-      boolean loadingConstraint = false;
-      if (!iterator.hasNext() && constraintIterator.hasNext()) {
-        loadingConstraint = true;
-      }
-      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
-        BootstrapEvent next;
-        if (!loadingConstraint) {
-          next = iterator.next();
-        } else {
-          next = constraintIterator.next();
-        }
-        switch (next.eventType()) {
-        case Database:
-          DatabaseEvent dbEvent = (DatabaseEvent) next;
-          dbTracker =
-              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
-                  .tasks();
-          loadTaskTracker.update(dbTracker);
-          if (work.hasDbState()) {
-            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
-          }
-          work.updateDbEventState(dbEvent.toState());
-          if (dbTracker.hasTasks()) {
-            scope.rootTasks.addAll(dbTracker.tasks());
-            scope.database = true;
-          }
-          dbTracker.debugLog("database");
-          break;
-        case Table: {
-          /*
-              Implicit assumption here is that database level is processed first before table level,
-              which will depend on the iterator used since it should provide the higher level directory
-              listing before providing the lower level listing. This is also required such that
-              the dbTracker /  tableTracker are setup correctly always.
-           */
-          TableContext tableContext =
-              new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
-          TableEvent tableEvent = (TableEvent) next;
-          LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
-                                              tableContext, loadTaskTracker);
-          tableTracker = loadTable.tasks();
-          setUpDependencies(dbTracker, tableTracker);
-          if (!scope.database && tableTracker.hasTasks()) {
-            scope.rootTasks.addAll(tableTracker.tasks());
-            scope.table = true;
-          }
-          /*
-            for table replication if we reach the max number of tasks then for the next run we will
-            try to reload the same table again, this is mainly for ease of understanding the code
-            as then we can avoid handling == > loading partitions for the table given that
-            the creation of table lead to reaching max tasks vs,  loading next table since current
-            one does not have partitions.
-           */
-
-          // for a table we explicitly try to load partitions as there is no separate partitions events.
-          LoadPartitions loadPartitions =
-              new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
-                      work.dbNameToLoadIn, tableContext);
-          TaskTracker partitionsTracker = loadPartitions.tasks();
-          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
-              partitionsTracker);
-          tableTracker.debugLog("table");
-          partitionsTracker.debugLog("partitions for table");
-          break;
-        }
-        case Partition: {
-          /*
-              This will happen only when loading tables and we reach the limit of number of tasks we can create;
-              hence we know here that the table should exist and there should be a lastPartitionName
-          */
-          PartitionEvent event = (PartitionEvent) next;
-          TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
-              work.tableNameToLoadIn);
-          LoadPartitions loadPartitions =
-              new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
-                      event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
-          /*
-               the tableTracker here should be a new instance and not an existing one as this can
-               only happen when we break in between loading partitions.
-           */
-          TaskTracker partitionsTracker = loadPartitions.tasks();
-          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
-              partitionsTracker);
-          partitionsTracker.debugLog("partitions");
-          break;
-        }
-        case Function: {
-          LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(),
-                                              (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
-          TaskTracker functionsTracker = loadFunction.tasks();
-          if (!scope.database) {
-            scope.rootTasks.addAll(functionsTracker.tasks());
-          } else {
-            setUpDependencies(dbTracker, functionsTracker);
-          }
-          loadTaskTracker.update(functionsTracker);
-          functionsTracker.debugLog("functions");
-          break;
-        }
-        case Constraint: {
-          LoadConstraint loadConstraint =
-              new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
-          TaskTracker constraintTracker = loadConstraint.tasks();
-          scope.rootTasks.addAll(constraintTracker.tasks());
-          loadTaskTracker.update(constraintTracker);
-          constraintTracker.debugLog("constraints");
-        }
-        }
-
-        if (!loadingConstraint && !iterator.currentDbHasNext()) {
-          createEndReplLogTask(context, scope, iterator.replLogger());
-        }
-      }
-      boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
-          || constraintIterator.hasNext();
-      createBuilderTask(scope.rootTasks, addAnotherLoadTask);
-      if (!iterator.hasNext() && !constraintIterator.hasNext()) {
-        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
-        work.updateDbEventState(null);
-      }
-      this.childTasks = scope.rootTasks;
-      /*
-      Since there can be multiple rounds of this run all of which will be tied to the same
-      query id -- generated in compile phase , adding a additional UUID to the end to print each run
-      in separate files.
-       */
-      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
-
-      // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later
-      driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
-    } catch (Exception e) {
-      LOG.error("failed replication", e);
-      setException(e);
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-    }
-    LOG.info("completed load task run : {}", work.executedLoadTask());
-    return 0;
-  }
-
-  private void createEndReplLogTask(Context context, Scope scope,
-                                                  ReplLogger replLogger) throws SemanticException {
-    Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters());
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
-    if (scope.rootTasks.isEmpty()) {
-      scope.rootTasks.add(replLogTask);
-    } else {
-      DAGTraversal.traverse(scope.rootTasks,
-          new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
-    }
-  }
-
-  /**
-   * There was a database update done before and we want to make sure we update the last repl
-   * id on this database as we are now going to switch to processing a new database.
-   *
-   * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
-   * is a root level task then in the execution phase the root level tasks will get executed first,
-   * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
-   * the last repl status of the target database will return a valid value, which will not represent
-   * the state of the database.
-   */
-  private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
-      throws SemanticException {
-  /*
-    we don't want to put any limits on this task as this is essential before we start
-    processing new database events.
-   */
-    TaskTracker taskTracker =
-        new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
-            new TaskTracker(maxTasks)).tasks();
-
-    AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks());
-    DAGTraversal.traverse(scope.rootTasks, function);
-
-    return taskTracker;
-  }
-
-  private void partitionsPostProcessing(BootstrapEventsIterator iterator,
-      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
-      TaskTracker partitionsTracker) throws SemanticException {
-    setUpDependencies(tableTracker, partitionsTracker);
-    if (!scope.database && !scope.table) {
-      scope.rootTasks.addAll(partitionsTracker.tasks());
-      scope.partition = true;
-    }
-    loadTaskTracker.update(tableTracker);
-    loadTaskTracker.update(partitionsTracker);
-    if (partitionsTracker.hasReplicationState()) {
-      iterator.setReplicationState(partitionsTracker.replicationState());
-    }
-  }
-
-  /*
-      This sets up dependencies such that a child task is dependant on the parent to be complete.
-   */
-  private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
-    if (parentTasks.hasTasks()) {
-      for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
-        for (Task<? extends Serializable> childTask : childTasks.tasks()) {
-          parentTask.addDependentTask(childTask);
-        }
-      }
-    } else {
-      for (Task<? extends Serializable> childTask : childTasks.tasks()) {
-        parentTasks.addTask(childTask);
-      }
-    }
-  }
-
-  private void createBuilderTask(List<Task<? extends Serializable>> rootTasks,
-      boolean shouldCreateAnotherLoadTask) {
-  /*
-    use loadTask as dependencyCollection
-   */
-    if (shouldCreateAnotherLoadTask) {
-      Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
-      DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
-    }
-  }
-
-  @Override
-  public StageType getType() {
-    return StageType.REPL_BOOTSTRAP_LOAD;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
deleted file mode 100644
index 048727f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.session.LineageState;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
-    Explain.Level.DEFAULT,
-    Explain.Level.EXTENDED })
-public class ReplLoadWork implements Serializable {
-  final String dbNameToLoadIn;
-  final String tableNameToLoadIn;
-  final String dumpDirectory;
-  private final BootstrapEventsIterator iterator;
-  private final ConstraintEventsIterator constraintsIterator;
-  private int loadTaskRunCount = 0;
-  private DatabaseEvent.State state = null;
-
-  /*
-  these are sessionState objects that are copied over to work to allow for parallel execution.
-  based on the current use case the methods are selectively synchronized, which might need to be
-  taken care when using other methods.
-  */
-  final LineageState sessionStateLineageState;
-
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
-      String tableNameToLoadIn, LineageState lineageState)
-      throws IOException {
-    this.tableNameToLoadIn = tableNameToLoadIn;
-    sessionStateLineageState = lineageState;
-    this.dumpDirectory = dumpDirectory;
-    this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
-    this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
-    this.dbNameToLoadIn = dbNameToLoadIn;
-  }
-
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
-      LineageState lineageState) throws IOException {
-    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState);
-  }
-
-  public BootstrapEventsIterator iterator() {
-    return iterator;
-  }
-
-  public ConstraintEventsIterator constraintIterator() {
-    return constraintsIterator;
-  }
-
-  int executedLoadTask() {
-    return ++loadTaskRunCount;
-  }
-
-  void updateDbEventState(DatabaseEvent.State state) {
-    this.state = state;
-  }
-
-  DatabaseEvent databaseEvent(HiveConf hiveConf) {
-    return state.toEvent(hiveConf);
-  }
-
-  boolean hasDbState() {
-    return state != null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 89d2ac2..ebe0090 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -82,6 +82,15 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
     FileSystem fileSystem = path.getFileSystem(hiveConf);
     FileStatus[] fileStatuses =
         fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem));
+    if ((fileStatuses == null) || (fileStatuses.length == 0)) {
+      throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
+    }
+    if ((dbNameToLoadIn != null) && (fileStatuses.length > 1)) {
+      throw new IllegalArgumentException(
+              "Multiple dirs in "
+                      + dumpDirectory
+                      + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+    }
 
     List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
       Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME);

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index 26f4892..d09b98c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 0270d2a..054153c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -31,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 
 import java.io.Serializable;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index b886ff4..a7c8ca4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -26,9 +26,10 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
deleted file mode 100644
index f8f0801..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class will be responsible to track how many tasks have been created,
- * organization of tasks such that after the number of tasks for next execution are created
- * we create a dependency collection task(DCT) -> another bootstrap task,
- * and then add DCT as dependent to all existing tasks that are created so the cycle can continue.
- */
-public class TaskTracker {
-  private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
-  /**
-   * used to identify the list of tasks at root level for a given level like table /  db / partition.
-   * this does not include the task dependency notion of "table tasks < ---- partition task"
-   */
-  private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
-  private ReplicationState replicationState = null;
-  // since tasks themselves can be graphs we want to limit the number of created
-  // tasks including all of dependencies.
-  private int numberOfTasks = 0;
-  private final int maxTasksAllowed;
-
-  public TaskTracker(int defaultMaxTasks) {
-    maxTasksAllowed = defaultMaxTasks;
-  }
-
-  public TaskTracker(TaskTracker existing) {
-    maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
-  }
-
-  /**
-   * this method is used to identify all the tasks in a graph.
-   * the graph however might get created in a disjoint fashion, in which case we can just update
-   * the number of tasks using the "update" method.
-   */
-  public void addTask(Task<? extends Serializable> task) {
-    tasks.add(task);
-
-    List <Task<? extends Serializable>> visited = new ArrayList<>();
-    updateTaskCount(task, visited);
-  }
-
-  // This method is used to traverse the DAG created in tasks list and add the dependent task to
-  // the tail of each task chain.
-  public void addDependentTask(Task<? extends Serializable> dependent) {
-    if (tasks.isEmpty()) {
-      addTask(dependent);
-    } else {
-      DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
-
-      List<Task<? extends Serializable>> visited = new ArrayList<>();
-      updateTaskCount(dependent, visited);
-    }
-  }
-
-  private void updateTaskCount(Task<? extends Serializable> task,
-                               List <Task<? extends Serializable>> visited) {
-    numberOfTasks += 1;
-    visited.add(task);
-    if (task.getChildTasks() != null) {
-      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-        if (visited.contains(childTask)) {
-          continue;
-        }
-        updateTaskCount(childTask, visited);
-      }
-    }
-  }
-
-  public boolean canAddMoreTasks() {
-    return numberOfTasks < maxTasksAllowed;
-  }
-
-  public boolean hasTasks() {
-    return numberOfTasks != 0;
-  }
-
-  public void update(TaskTracker withAnother) {
-    numberOfTasks += withAnother.numberOfTasks;
-    if (withAnother.hasReplicationState()) {
-      this.replicationState = withAnother.replicationState;
-    }
-  }
-
-  public void setReplicationState(ReplicationState state) {
-    this.replicationState = state;
-  }
-
-  public boolean hasReplicationState() {
-    return replicationState != null;
-  }
-
-  public ReplicationState replicationState() {
-    return replicationState;
-  }
-
-  public List<Task<? extends Serializable>> tasks() {
-    return tasks;
-  }
-
-  public void debugLog(String forEventType) {
-    LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks,
-        tasks.size());
-  }
-
-  public int numberOfTasks() {
-    return numberOfTasks;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index f6493f7..c0cfc43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 419a511..089b529 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -27,10 +27,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
index b5b5b90..8e01fb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;