You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/08/08 18:43:28 UTC

[2/2] hive git commit: HIVE-16896: move replication load related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)

HIVE-16896: move replication load related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/92f764e0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/92f764e0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/92f764e0

Branch: refs/heads/master
Commit: 92f764e056cc87b8c5ea82b5372a9cbbf804ec33
Parents: 844ec34
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Aug 8 11:43:00 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Aug 8 11:43:00 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 ...TestReplicationScenariosAcrossInstances.java |  53 +++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  66 +++--
 ql/if/queryplan.thrift                          |   3 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |   8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |   3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |   7 +-
 ql/src/gen/thrift/gen-php/Types.php             |   6 +-
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |   9 +-
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |   7 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |   2 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   4 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |  13 +-
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    | 264 +++++++++++++++++
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |  71 +++++
 .../repl/bootstrap/events/BootstrapEvent.java   |  28 ++
 .../repl/bootstrap/events/DatabaseEvent.java    |  34 +++
 .../repl/bootstrap/events/FunctionEvent.java    |  33 +++
 .../repl/bootstrap/events/PartitionEvent.java   |  26 ++
 .../exec/repl/bootstrap/events/TableEvent.java  |  42 +++
 .../filesystem/BootstrapEventsIterator.java     | 133 +++++++++
 .../filesystem/DatabaseEventsIterator.java      | 141 +++++++++
 .../events/filesystem/FSDatabaseEvent.java      |  88 ++++++
 .../events/filesystem/FSFunctionEvent.java      |  39 +++
 .../events/filesystem/FSPartitionEvent.java     |  84 ++++++
 .../events/filesystem/FSTableEvent.java         | 123 ++++++++
 .../exec/repl/bootstrap/load/LoadDatabase.java  | 129 +++++++++
 .../exec/repl/bootstrap/load/LoadFunction.java  |  73 +++++
 .../repl/bootstrap/load/ReplicationState.java   |  58 ++++
 .../exec/repl/bootstrap/load/TaskTracker.java   | 113 ++++++++
 .../bootstrap/load/table/LoadPartitions.java    | 283 +++++++++++++++++++
 .../repl/bootstrap/load/table/LoadTable.java    | 216 ++++++++++++++
 .../repl/bootstrap/load/table/TableContext.java |  49 ++++
 .../exec/repl/bootstrap/load/util/Context.java  |  37 +++
 .../repl/bootstrap/load/util/PathUtils.java     | 105 +++++++
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  28 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 216 +-------------
 .../hadoop/hive/ql/plan/ImportTableDesc.java    |  12 +-
 .../repl/bootstrap/load/TaskTrackerTest.java    |  29 ++
 .../repl_load_requires_admin.q.out              |   4 -
 40 files changed, 2356 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c16880e..7cee344 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.conf;
 
 import com.google.common.base.Joiner;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -37,7 +36,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.HiveCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -442,6 +440,11 @@ public class HiveConf extends Configuration {
         "Inteval for cmroot cleanup thread."),
     REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
         "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
+    REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000,
+        "Provide and approximate of the max number of tasks that should be executed in before  \n" +
+            "dynamically generating the next set of tasks. The number is an approximate as we \n" +
+            "will stop at slightly higher number than above, the reason being some events might \n" +
+            "lead to an task increment that would cross the above limit"),
     REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5,
         "Number of threads that will be used to dump partition data information during repl dump."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index c431537..2af728f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
 import org.junit.AfterClass;
@@ -89,7 +90,7 @@ public class TestReplicationScenariosAcrossInstances {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
     replica.load(replicatedDbName, bootStrapDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
-        .verify(bootStrapDump.lastReplicationId);
+        .verifyResult(bootStrapDump.lastReplicationId);
 
     primary.run("CREATE FUNCTION " + primaryDbName
         + ".testFunction as 'hivemall.tools.string.StopwordUDF' "
@@ -99,16 +100,16 @@ public class TestReplicationScenariosAcrossInstances {
         primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
-        .verify(incrementalDump.lastReplicationId)
+        .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
-        .verify(replicatedDbName + ".testFunction");
+        .verifyResult(replicatedDbName + ".testFunction");
 
     // Test the idempotent behavior of CREATE FUNCTION
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verify(incrementalDump.lastReplicationId)
+        .verifyResult(incrementalDump.lastReplicationId)
             .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
-            .verify(replicatedDbName + ".testFunction");
+        .verifyResult(replicatedDbName + ".testFunction");
   }
 
   @Test
@@ -119,7 +120,7 @@ public class TestReplicationScenariosAcrossInstances {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
     replica.load(replicatedDbName, bootStrapDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
-        .verify(bootStrapDump.lastReplicationId);
+        .verifyResult(bootStrapDump.lastReplicationId);
 
     primary.run("Drop FUNCTION " + primaryDbName + ".testFunction ");
 
@@ -127,16 +128,16 @@ public class TestReplicationScenariosAcrossInstances {
         primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
-        .verify(incrementalDump.lastReplicationId)
+        .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '*testfunction*'")
-        .verify(null);
+        .verifyResult(null);
 
     // Test the idempotent behavior of DROP FUNCTION
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verify(incrementalDump.lastReplicationId)
+        .verifyResult(incrementalDump.lastReplicationId)
             .run("SHOW FUNCTIONS LIKE '*testfunction*'")
-            .verify(null);
+        .verifyResult(null);
   }
 
   @Test
@@ -148,7 +149,7 @@ public class TestReplicationScenariosAcrossInstances {
 
     replica.load(replicatedDbName, bootStrapDump.dumpLocation)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
-        .verify(replicatedDbName + ".testFunction");
+        .verifyResult(replicatedDbName + ".testFunction");
   }
 
   @Test
@@ -164,7 +165,7 @@ public class TestReplicationScenariosAcrossInstances {
 
     replica.load(replicatedDbName, tuple.dumpLocation)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
-        .verify(replicatedDbName + ".anotherFunction");
+        .verifyResult(replicatedDbName + ".anotherFunction");
 
     FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus(
         new Path(
@@ -218,4 +219,32 @@ public class TestReplicationScenariosAcrossInstances {
         }).collect(Collectors.toList());
     return new Dependencies(collect);
   }
+
+  /*
+  From the hive logs(hive.log) we can also check for the info statement
+  fgrep "Total Tasks" [location of hive.log]
+  each line indicates one run of loadTask.
+   */
+  @Test
+  public void testMultipleStagesOfReplicationLoadTask() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+        .run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('bangalore')")
+        .run("insert into table t2 partition(country='india') values ('mumbai')")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("create table t3 (rank int)")
+        .dump(primaryDbName, null);
+
+    // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
+    replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1);
+    replica.load(replicatedDbName, tuple.dumpLocation)
+        .run("use " + replicatedDbName)
+        .run("show tables")
+        .verifyResults(new String[] { "t1", "t2", "t3" })
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 8dfab08..c084d4d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -56,7 +56,7 @@ class WarehouseInstance implements Closeable {
   final String functionsRoot;
   private Logger logger;
   private Driver driver;
-  private HiveConf hiveConf;
+  HiveConf hiveConf;
   MiniDFSCluster miniDFSCluster;
   private HiveMetaStoreClient client;
 
@@ -71,16 +71,18 @@ class WarehouseInstance implements Closeable {
     assert miniDFSCluster.isDataNodeUp();
     DistributedFileSystem fs = miniDFSCluster.getFileSystem();
 
+    Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
     Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
     this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
-    initialize(cmRootPath.toString(), hiveInTests);
+    initialize(cmRootPath.toString(), warehouseRoot.toString(), hiveInTests);
   }
 
   WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
     this(logger, cluster, true);
   }
 
-  private void initialize(String cmRoot, boolean hiveInTest) throws Exception {
+  private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest)
+      throws Exception {
     hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
     String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
     String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
@@ -95,6 +97,7 @@ class WarehouseInstance implements Closeable {
 
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest);
     // turn on db notification listener on meta store
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot);
     hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS);
     hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -181,38 +184,39 @@ class WarehouseInstance implements Closeable {
     return this;
   }
 
-  WarehouseInstance verify(String data) throws IOException {
-    return verifyResults(data == null ? new String[] {} : new String[] { data });
-  }
+    WarehouseInstance verifyResult (String data) throws IOException {
+      verifyResults(data == null ? new String[] {} : new String[] { data });
+      return this;
+    }
 
-  /**
-   * All the results that are read from the hive output will not preserve
-   * case sensitivity and will all be in lower case, hence we will check against
-   * only lower case data values.
-   * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
-   * before assert.
-   */
-  WarehouseInstance verifyResults(String[] data) throws IOException {
-    List<String> results = getOutput();
-    logger.info("Expecting {}", StringUtils.join(data, ","));
-    logger.info("Got {}", results);
-    assertEquals(data.length, results.size());
-    for (int i = 0; i < data.length; i++) {
-      assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
+    /**
+     * All the results that are read from the hive output will not preserve
+     * case sensitivity and will all be in lower case, hence we will check against
+     * only lower case data values.
+     * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+     * before assert.
+     */
+    WarehouseInstance verifyResults(String[] data) throws IOException {
+      List<String> results = getOutput();
+      logger.info("Expecting {}", StringUtils.join(data, ","));
+      logger.info("Got {}", results);
+      assertEquals(data.length, results.size());
+      for (int i = 0; i < data.length; i++) {
+        assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
+      }
+      return this;
     }
-    return this;
-  }
 
-  List<String> getOutput() throws IOException {
-    List<String> results = new ArrayList<>();
-    try {
-      driver.getResults(results);
-    } catch (CommandNeedRetryException e) {
-      logger.warn(e.getMessage(), e);
-      throw new RuntimeException(e);
+    List<String> getOutput() throws IOException {
+      List<String> results = new ArrayList<>();
+      try {
+        driver.getResults(results);
+      } catch (CommandNeedRetryException e) {
+        logger.warn(e.getMessage(), e);
+        throw new RuntimeException(e);
+      }
+      return results;
     }
-    return results;
-  }
 
   private void printOutput() throws IOException {
     for (String s : getOutput()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index dc55805..00b0200 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -100,7 +100,8 @@ enum StageType {
   STATS,
   DEPENDENCY_COLLECTION,
   COLUMNSTATS,
-  REPLDUMP,
+  REPL_DUMP,
+  REPL_BOOTSTRAP_LOAD,
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 7254d50..f467da2 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -116,7 +116,8 @@ int _kStageTypeValues[] = {
   StageType::STATS,
   StageType::DEPENDENCY_COLLECTION,
   StageType::COLUMNSTATS,
-  StageType::REPLDUMP
+  StageType::REPL_DUMP,
+  StageType::REPL_BOOTSTRAP_LOAD
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -131,9 +132,10 @@ const char* _kStageTypeNames[] = {
   "STATS",
   "DEPENDENCY_COLLECTION",
   "COLUMNSTATS",
-  "REPLDUMP"
+  "REPL_DUMP",
+  "REPL_BOOTSTRAP_LOAD"
 };
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index 38d054b..ac87ef7 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -94,7 +94,8 @@ struct StageType {
     STATS = 9,
     DEPENDENCY_COLLECTION = 10,
     COLUMNSTATS = 11,
-    REPLDUMP = 12
+    REPL_DUMP = 12,
+    REPL_BOOTSTRAP_LOAD = 13
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index deca574..11a8f6d 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -24,7 +24,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   STATS(9),
   DEPENDENCY_COLLECTION(10),
   COLUMNSTATS(11),
-  REPLDUMP(12);
+  REPL_DUMP(12),
+  REPL_BOOTSTRAP_LOAD(13);
 
   private final int value;
 
@@ -70,7 +71,9 @@ public enum StageType implements org.apache.thrift.TEnum {
       case 11:
         return COLUMNSTATS;
       case 12:
-        return REPLDUMP;
+        return REPL_DUMP;
+      case 13:
+        return REPL_BOOTSTRAP_LOAD;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index 4d902ee..68edfcd 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -114,7 +114,8 @@ final class StageType {
   const STATS = 9;
   const DEPENDENCY_COLLECTION = 10;
   const COLUMNSTATS = 11;
-  const REPLDUMP = 12;
+  const REPL_DUMP = 12;
+  const REPL_BOOTSTRAP_LOAD = 13;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -128,7 +129,8 @@ final class StageType {
     9 => 'STATS',
     10 => 'DEPENDENCY_COLLECTION',
     11 => 'COLUMNSTATS',
-    12 => 'REPLDUMP',
+    12 => 'REPL_DUMP',
+    13 => 'REPL_BOOTSTRAP_LOAD',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 9e29129..6bf65af 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -160,7 +160,8 @@ class StageType:
   STATS = 9
   DEPENDENCY_COLLECTION = 10
   COLUMNSTATS = 11
-  REPLDUMP = 12
+  REPL_DUMP = 12
+  REPL_BOOTSTRAP_LOAD = 13
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -175,7 +176,8 @@ class StageType:
     9: "STATS",
     10: "DEPENDENCY_COLLECTION",
     11: "COLUMNSTATS",
-    12: "REPLDUMP",
+    12: "REPL_DUMP",
+    13: "REPL_BOOTSTRAP_LOAD",
   }
 
   _NAMES_TO_VALUES = {
@@ -191,7 +193,8 @@ class StageType:
     "STATS": 9,
     "DEPENDENCY_COLLECTION": 10,
     "COLUMNSTATS": 11,
-    "REPLDUMP": 12,
+    "REPL_DUMP": 12,
+    "REPL_BOOTSTRAP_LOAD": 13,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 1433d4a..2730dde 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -72,9 +72,10 @@ module StageType
   STATS = 9
   DEPENDENCY_COLLECTION = 10
   COLUMNSTATS = 11
-  REPLDUMP = 12
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze
+  REPL_DUMP = 12
+  REPL_BOOTSTRAP_LOAD = 13
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index fdcf052..9183edf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -561,7 +561,7 @@ public class Context {
 
 
   private static final String MR_PREFIX = "-mr-";
-  private static final String EXT_PREFIX = "-ext-";
+  public static final String EXT_PREFIX = "-ext-";
   private static final String LOCAL_PREFIX = "-local-";
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 94d6c5a..91ac4bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
@@ -111,6 +113,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
     taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
     taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
+    taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
@@ -149,6 +152,7 @@ public final class TaskFactory {
     throw new RuntimeException("No task for work class " + workClass.getName());
   }
 
+  @SafeVarargs
   public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
       Task<? extends Serializable>... tasklist) {
     Task<T> ret = get((Class<T>) work.getClass(), conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 7501ed7..34b6737 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -61,7 +61,7 @@ import java.util.List;
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
   private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
-  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+  private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
 
   private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
   private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
@@ -86,6 +86,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
     } catch (Exception e) {
       LOG.error("failed", e);
+      setException(e);
       return 1;
     }
     return 0;
@@ -262,10 +263,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       // make it easy to write .q unit tests, instead of unique id generation.
       // however, this does mean that in writing tests, we have to be aware that
       // repl dump will clash with prior dumps, and thus have to clean up properly.
-      if (work.testInjectDumpDir == null) {
+      if (ReplDumpWork.testInjectDumpDir == null) {
         return "next";
       } else {
-        return work.testInjectDumpDir;
+        return ReplDumpWork.testInjectDumpDir;
       }
     } else {
       return String.valueOf(System.currentTimeMillis());
@@ -284,9 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         continue;
       }
       Path functionRoot = new Path(functionsRoot, functionName);
-      Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
+      Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME);
       try (JsonWriter jsonWriter =
-          new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) {
+          new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) {
         FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
         serializer.writeTo(jsonWriter, tuple.replicationSpec);
       }
@@ -315,6 +316,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
   @Override
   public StageType getType() {
-    return StageType.REPLDUMP;
+    return StageType.REPL_DUMP;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
new file mode 100644
index 0000000..6ea1754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -0,0 +1,264 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+
+public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
+  private final static int ZERO_TASKS = 0;
+
+  @Override
+  public String getName() {
+    return "REPL_BOOTSTRAP_LOAD";
+  }
+
+  /**
+   * Provides the root Tasks created as a result of this loadTask run which will be executed
+   * by the driver. It does not track details across multiple runs of LoadTask.
+   */
+  private static class Scope {
+    boolean database = false, table = false, partition = false;
+    List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
+  }
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    try {
+      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+      Context context = new Context(conf, getHive());
+      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+      /*
+          for now for simplicity we are doing just one directory ( one database ), come back to use
+          of multiple databases once we have the basic flow to chain creating of tasks in place for
+          a database ( directory )
+      */
+      BootstrapEventsIterator iterator = work.iterator();
+      /*
+      This is used to get hold of a reference during the current creation of tasks and is initialized
+      with "0" tasks such that it will be non consequential in any operations done with task tracker
+      compositions.
+       */
+      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+      Scope scope = new Scope();
+      while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) {
+        BootstrapEvent next = iterator.next();
+        switch (next.eventType()) {
+        case Database:
+          DatabaseEvent dbEvent = (DatabaseEvent) next;
+          dbTracker =
+              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
+                  .tasks();
+          loadTaskTracker.update(dbTracker);
+          if (work.hasDbState()) {
+            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+          }
+          work.updateDbEventState(dbEvent.toState());
+          scope.database = true;
+          scope.rootTasks.addAll(dbTracker.tasks());
+          dbTracker.debugLog("database");
+          break;
+        case Table: {
+          /*
+              Implicit assumption here is that database level is processed first before table level,
+              which will depend on the iterator used since it should provide the higher level directory
+              listing before providing the lower level listing. This is also required such that
+              the dbTracker /  tableTracker are setup correctly always.
+           */
+          TableContext tableContext =
+              new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
+          TableEvent tableEvent = (TableEvent) next;
+          LoadTable loadTable = new LoadTable(tableEvent, context, tableContext, loadTaskTracker);
+          tableTracker = loadTable.tasks();
+          if (!scope.database) {
+            scope.rootTasks.addAll(tableTracker.tasks());
+            scope.table = true;
+          }
+          setUpDependencies(dbTracker, tableTracker);
+          /*
+            for table replication if we reach the max number of tasks then for the next run we will
+            try to reload the same table again, this is mainly for ease of understanding the code
+            as then we can avoid handling == > loading partitions for the table given that
+            the creation of table lead to reaching max tasks vs,  loading next table since current
+            one does not have partitions.
+           */
+
+          // for a table we explicitly try to load partitions as there is no separate partitions events.
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, loadTaskTracker, tableEvent, work.dbNameToLoadIn,
+                  tableContext);
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+              partitionsTracker);
+          tableTracker.debugLog("table");
+          partitionsTracker.debugLog("partitions for table");
+          break;
+        }
+        case Partition: {
+          /*
+              This will happen only when loading tables and we reach the limit of number of tasks we can create;
+              hence we know here that the table should exist and there should be a lastPartitionName
+          */
+          PartitionEvent event = (PartitionEvent) next;
+          TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
+              work.tableNameToLoadIn);
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, tableContext, loadTaskTracker, event.asTableEvent(),
+                  work.dbNameToLoadIn,
+                  event.lastPartitionReplicated());
+          /*
+               the tableTracker here should be a new instance and not an existing one as this can
+               only happen when we break in between loading partitions.
+           */
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+              partitionsTracker);
+          partitionsTracker.debugLog("partitions");
+          break;
+        }
+        case Function: {
+          LoadFunction loadFunction =
+              new LoadFunction(context, (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
+          TaskTracker functionsTracker = loadFunction.tasks();
+          if (!scope.database) {
+            scope.rootTasks.addAll(functionsTracker.tasks());
+          } else {
+            setUpDependencies(dbTracker, functionsTracker);
+          }
+          loadTaskTracker.update(functionsTracker);
+          functionsTracker.debugLog("functions");
+          break;
+        }
+        }
+      }
+      boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState();
+      createBuilderTask(scope.rootTasks, addAnotherLoadTask);
+      if (!iterator.hasNext()) {
+        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+      }
+      this.childTasks = scope.rootTasks;
+      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+    } catch (Exception e) {
+      LOG.error("failed replication", e);
+      setException(e);
+      return 1;
+    }
+    LOG.info("completed load task run : {}", work.executedLoadTask());
+    return 0;
+  }
+
+  /**
+   * There was a database update done before and we want to make sure we update the last repl
+   * id on this database as we are now going to switch to processing a new database.
+   */
+  private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
+      throws SemanticException {
+  /*
+    we don't want to put any limits on this task as this is essential before we start
+    processing new database events.
+   */
+    TaskTracker taskTracker =
+        new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
+            new TaskTracker(maxTasks)).tasks();
+    scope.rootTasks.addAll(taskTracker.tasks());
+    return taskTracker;
+  }
+
+  private void partitionsPostProcessing(BootstrapEventsIterator iterator,
+      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+      TaskTracker partitionsTracker) throws SemanticException {
+    setUpDependencies(tableTracker, partitionsTracker);
+    if (!scope.database && !scope.table) {
+      scope.rootTasks.addAll(partitionsTracker.tasks());
+      scope.partition = true;
+    }
+    loadTaskTracker.update(tableTracker);
+    loadTaskTracker.update(partitionsTracker);
+    if (partitionsTracker.hasReplicationState()) {
+      iterator.setReplicationState(partitionsTracker.replicationState());
+    }
+  }
+
+  /*
+      This sets up dependencies such that a child task is dependant on the parent to be complete.
+   */
+  private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
+    for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+      for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+        parentTask.addDependentTask(childTask);
+      }
+    }
+  }
+
+  private void createBuilderTask(List<Task<? extends Serializable>> rootTasks,
+      boolean shouldCreateAnotherLoadTask) {
+  /*
+    use loadTask as dependencyCollection
+   */
+    if (shouldCreateAnotherLoadTask) {
+      Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+      dependency(rootTasks, loadTask);
+    }
+  }
+
+  /**
+   * add the dependency to the leaf node
+   */
+  private boolean dependency(List<Task<? extends Serializable>> tasks,
+      Task<ReplLoadWork> loadTask) {
+    if (tasks == null || tasks.isEmpty()) {
+      return true;
+    }
+    for (Task<? extends Serializable> task : tasks) {
+      boolean dependency = dependency(task.getChildTasks(), loadTask);
+      if (dependency) {
+        task.addDependentTask(loadTask);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.REPL_BOOTSTRAP_LOAD;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
new file mode 100644
index 0000000..eb18e5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -0,0 +1,71 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.plan.Explain;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class ReplLoadWork implements Serializable {
+  final String dbNameToLoadIn;
+  final String tableNameToLoadIn;
+  private final BootstrapEventsIterator iterator;
+  private int loadTaskRunCount = 0;
+  private DatabaseEvent.State state = null;
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
+      String tableNameToLoadIn) throws IOException {
+    this.tableNameToLoadIn = tableNameToLoadIn;
+    this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf);
+    this.dbNameToLoadIn = dbNameToLoadIn;
+  }
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern)
+      throws IOException {
+    this(hiveConf, dumpDirectory, dbNameOrPattern, null);
+  }
+
+  public BootstrapEventsIterator iterator() {
+    return iterator;
+  }
+
+  int executedLoadTask() {
+    return ++loadTaskRunCount;
+  }
+
+  void updateDbEventState(DatabaseEvent.State state) {
+    this.state = state;
+  }
+
+  DatabaseEvent databaseEvent(HiveConf hiveConf) {
+    DatabaseEvent databaseEvent = state.toEvent(hiveConf);
+    state = null;
+    return databaseEvent;
+  }
+
+  boolean hasDbState() {
+    return state != null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
new file mode 100644
index 0000000..db2b0ac
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
@@ -0,0 +1,28 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+public interface BootstrapEvent {
+
+  EventType eventType();
+
+  enum EventType {
+    Database, Table, Function, Partition
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
new file mode 100644
index 0000000..6d6c336
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
@@ -0,0 +1,34 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+
+public interface DatabaseEvent extends BootstrapEvent {
+  Database dbInMetadata(String dbNameToOverride) throws SemanticException;
+
+  State toState();
+
+  interface State extends Serializable {
+    DatabaseEvent toEvent(HiveConf hiveConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
new file mode 100644
index 0000000..30bb747
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
@@ -0,0 +1,33 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Exposing the FileSystem implementation outside which is what it should NOT do.
+ * <p>
+ * Since the bootstrap and incremental for functions is handled similarly. There
+ * is additional work to make sure we pass the event object from both places.
+ *
+ * @see org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.FunctionDescBuilder
+ * would be merged here mostly.
+ */
+public interface FunctionEvent extends BootstrapEvent {
+  Path rootDir();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
new file mode 100644
index 0000000..3b260d6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
@@ -0,0 +1,26 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+
+public interface PartitionEvent extends TableEvent {
+  AddPartitionDesc lastPartitionReplicated();
+
+  TableEvent asTableEvent();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
new file mode 100644
index 0000000..e817f5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -0,0 +1,42 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+
+import java.util.List;
+
+public interface TableEvent extends BootstrapEvent {
+  ImportTableDesc tableDesc(String dbName) throws SemanticException;
+
+  List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+      throws SemanticException;
+
+  ReplicationSpec replicationSpec();
+
+  boolean shouldNotReplicate();
+
+  /**
+   * Exposing the FileSystem implementation outside which is what it should NOT do.
+   */
+  Path metadataPath();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
new file mode 100644
index 0000000..4e635ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -0,0 +1,133 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Replication layout is from the root directory of replication Dump is
+ * db
+ *    table1
+ *        _metadata
+ *        data
+ *          _files
+ *    table2
+ *        _metadata
+ *        data
+ *          _files
+ *    _functions
+ *        functionName1
+ *          _metadata
+ *        functionName2
+ *          _metadata
+ * this class understands this layout and hence will help in identifying for subsequent bootstrap tasks
+ * as to where the last set of tasks left execution and from where this task should pick up replication.
+ * Since for replication we have the need for hierarchy of tasks we need to make sure that db level are
+ * processed first before table, table level are processed first before partitions etc.
+ *
+ * Based on how the metadata is being exported on the file we have to currently take care of the following:
+ * 1. Make sure db level are processed first as this will be required before table / functions processing.
+ * 2. Table before partition is not explicitly required as table and partition metadata are in the same file.
+ *
+ *
+ * For future integrations other sources of events like kafka, would require to implement an Iterator<BootstrapEvent>
+ *
+ */
+public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
+  private DatabaseEventsIterator currentDatabaseIterator = null;
+  /*
+      This denotes listing of any directories where during replication we want to take care of
+      db level operations first, namely in our case its only during db creation on the replica
+      warehouse.
+   */
+  private Iterator<DatabaseEventsIterator> dbEventsIterator;
+
+  public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException {
+    Path path = new Path(dumpDirectory);
+    FileSystem fileSystem = path.getFileSystem(hiveConf);
+    FileStatus[] fileStatuses =
+        fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem));
+
+    List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
+      Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME);
+      try {
+        return fileSystem.exists(metadataPath);
+      } catch (IOException e) {
+        throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e);
+      }
+    }).collect(Collectors.toList());
+    dbEventsIterator = dbsToCreate.stream().map(f -> {
+      try {
+        return new DatabaseEventsIterator(f.getPath(), hiveConf);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Error while creating event iterator for db at path" + f.getPath().toString(), e);
+      }
+    }).collect(Collectors.toList()).iterator();
+
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (true) {
+      if (currentDatabaseIterator == null) {
+        if (dbEventsIterator.hasNext()) {
+          currentDatabaseIterator = dbEventsIterator.next();
+        } else {
+          return false;
+        }
+      } else if (currentDatabaseIterator.hasNext()) {
+        return true;
+      } else {
+        currentDatabaseIterator = null;
+      }
+    }
+  }
+
+  @Override
+  public BootstrapEvent next() {
+    return currentDatabaseIterator.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("This operation is not supported");
+  }
+
+  @Override
+  public void forEachRemaining(Consumer<? super BootstrapEvent> action) {
+    throw new UnsupportedOperationException("This operation is not supported");
+  }
+
+  public void setReplicationState(ReplicationState replicationState) {
+    this.currentDatabaseIterator.replicationState = replicationState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
new file mode 100644
index 0000000..3100875
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -0,0 +1,141 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME;
+
+class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
+  private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class);
+  private RemoteIterator<LocatedFileStatus> remoteIterator;
+
+  private final Path dbLevelPath;
+  private HiveConf hiveConf;
+  ReplicationState replicationState;
+  private Path next = null, previous = null;
+  private boolean databaseEventProcessed = false;
+
+  DatabaseEventsIterator(Path dbLevelPath, HiveConf hiveConf) throws IOException {
+    this.dbLevelPath = dbLevelPath;
+    this.hiveConf = hiveConf;
+    FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf);
+    // this is only there for the use case where we are doing table only replication and not database level
+    if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) {
+      databaseEventProcessed = true;
+    }
+    remoteIterator = fileSystem.listFiles(dbLevelPath, true);
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (!databaseEventProcessed) {
+        next = dbLevelPath;
+        return true;
+      }
+
+      if (replicationState == null && next == null) {
+        while (remoteIterator.hasNext()) {
+          LocatedFileStatus next = remoteIterator.next();
+          if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
+            String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
+            List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))
+                .filter(StringUtils::isNotBlank)
+                .collect(Collectors.toList());
+            if (filteredNames.size() == 1) {
+              // this relates to db level event tracked via databaseEventProcessed
+            } else {
+              this.next = next.getPath().getParent();
+              return true;
+            }
+          }
+        }
+        return false;
+      }
+      return true;
+    } catch (Exception e) {
+      // may be do some retry logic here.
+      throw new RuntimeException("could not traverse the file via remote iterator " + dbLevelPath,
+          e);
+    }
+  }
+
+  /*
+  we handle three types of scenarios with special case.
+  1. handling of db Level _metadata
+  2. handling of subsequent loadTask which will start running from the previous replicationState
+  3. other events : these can only be either table / function _metadata.
+   */
+  @Override
+  public BootstrapEvent next() {
+    if (!databaseEventProcessed) {
+      FSDatabaseEvent event = new FSDatabaseEvent(hiveConf, next.toString());
+      databaseEventProcessed = true;
+      return postProcessing(event);
+    }
+
+    if (replicationState != null) {
+      return eventForReplicationState();
+    }
+
+    String currentPath = next.toString();
+    if (currentPath.contains(FUNCTIONS_ROOT_DIR_NAME)) {
+      LOG.debug("functions directory: {}", next.toString());
+      return postProcessing(new FSFunctionEvent(next));
+    }
+    return postProcessing(new FSTableEvent(hiveConf, next.toString()));
+  }
+
+  private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) {
+    previous = next;
+    next = null;
+    LOG.debug("processing " + previous);
+    return bootstrapEvent;
+  }
+
+  private BootstrapEvent eventForReplicationState() {
+    if (replicationState.partitionState != null) {
+      BootstrapEvent
+          bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState);
+      replicationState = null;
+      return bootstrapEvent;
+    } else if (replicationState.lastTableReplicated != null) {
+      FSTableEvent event = new FSTableEvent(hiveConf, previous.toString());
+      replicationState = null;
+      return event;
+    }
+    throw new IllegalStateException("for replicationState " + replicationState.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
new file mode 100644
index 0000000..48c908a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
@@ -0,0 +1,88 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+
+public class FSDatabaseEvent implements DatabaseEvent {
+
+  private final Path dbMetadataFile;
+  private final FileSystem fileSystem;
+
+  FSDatabaseEvent(HiveConf hiveConf, String dbDumpDirectory) {
+    try {
+      this.dbMetadataFile = new Path(dbDumpDirectory, EximUtil.METADATA_NAME);
+      this.fileSystem = dbMetadataFile.getFileSystem(hiveConf);
+    } catch (Exception e) {
+      String message = "Error while identifying the filesystem for db "
+          + "metadata file in " + dbDumpDirectory;
+      throw new RuntimeException(message, e);
+    }
+  }
+
+  @Override
+  public Database dbInMetadata(String dbNameToOverride) throws SemanticException {
+    try {
+      MetaData rv = EximUtil.readMetaData(fileSystem, dbMetadataFile);
+      Database dbObj = rv.getDatabase();
+      if (dbObj == null) {
+        throw new IllegalArgumentException(
+            "_metadata file read did not contain a db object - invalid dump.");
+      }
+
+      // override the db name if provided in repl load command
+      if (StringUtils.isNotBlank(dbNameToOverride)) {
+        dbObj.setName(dbNameToOverride);
+      }
+      return dbObj;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  @Override
+  public State toState() {
+    return new FSDBState(dbMetadataFile.getParent().toString());
+  }
+
+  @Override
+  public EventType eventType() {
+    return EventType.Database;
+  }
+
+  static class FSDBState implements DatabaseEvent.State {
+    final String dbDumpDirectory;
+
+    FSDBState(String dbDumpDirectory) {
+      this.dbDumpDirectory = dbDumpDirectory;
+    }
+
+    @Override
+    public DatabaseEvent toEvent(HiveConf hiveConf) {
+      return new FSDatabaseEvent(hiveConf, dbDumpDirectory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
new file mode 100644
index 0000000..5b13ffc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
@@ -0,0 +1,39 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+
+public class FSFunctionEvent implements FunctionEvent {
+  private final Path rootDir;
+
+  FSFunctionEvent(Path rootDir) {
+    this.rootDir = rootDir;
+  }
+
+  @Override
+  public Path rootDir() {
+    return rootDir;
+  }
+
+  @Override
+  public EventType eventType() {
+    return EventType.Function;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
new file mode 100644
index 0000000..9b71a2e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -0,0 +1,84 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+
+import java.util.List;
+
+public class FSPartitionEvent implements PartitionEvent {
+
+  private final ReplicationState replicationState;
+  private final TableEvent tableEvent;
+
+  FSPartitionEvent(HiveConf hiveConf, String metadataDir,
+      ReplicationState replicationState) {
+    tableEvent = new FSTableEvent(hiveConf, metadataDir);
+    this.replicationState = replicationState;
+  }
+
+  @Override
+  public EventType eventType() {
+    return EventType.Partition;
+  }
+
+  @Override
+  public AddPartitionDesc lastPartitionReplicated() {
+    assert replicationState != null && replicationState.partitionState != null;
+    return replicationState.partitionState.lastReplicatedPartition;
+  }
+
+  @Override
+  public TableEvent asTableEvent() {
+    return tableEvent;
+  }
+
+  @Override
+  public ImportTableDesc tableDesc(String dbName) throws SemanticException {
+    return tableEvent.tableDesc(dbName);
+  }
+
+  @Override
+  public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+      throws SemanticException {
+    return tableEvent.partitionDescriptions(tblDesc);
+  }
+
+  @Override
+  public ReplicationSpec replicationSpec() {
+    return tableEvent.replicationSpec();
+  }
+
+  @Override
+  public boolean shouldNotReplicate() {
+    return tableEvent.shouldNotReplicate();
+  }
+
+  @Override
+  public Path metadataPath() {
+    return tableEvent.metadataPath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
new file mode 100644
index 0000000..f313404
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -0,0 +1,123 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FSTableEvent implements TableEvent {
+  private final Path fromPath;
+  private final MetaData metadata;
+
+  FSTableEvent(HiveConf hiveConf, String metadataDir) {
+    try {
+      URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir));
+      fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+      FileSystem fs = FileSystem.get(fromURI, hiveConf);
+      metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public boolean shouldNotReplicate() {
+    ReplicationSpec spec = metadata.getReplicationSpec();
+    return spec.isNoop() || !spec.isInReplicationScope();
+  }
+
+  @Override
+  public Path metadataPath() {
+    return fromPath;
+  }
+
+  @Override
+  public ImportTableDesc tableDesc(String dbName) throws SemanticException {
+    try {
+      Table table = new Table(metadata.getTable());
+      ImportTableDesc tableDesc = new ImportTableDesc(dbName, table);
+      tableDesc.setReplicationSpec(metadata.getReplicationSpec());
+      return tableDesc;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  @Override
+  public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+      throws SemanticException {
+    List<AddPartitionDesc> descs = new ArrayList<>();
+    //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose.
+    for (Partition partition : metadata.getPartitions()) {
+      // TODO: this should ideally not create AddPartitionDesc per partition
+      AddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition);
+      descs.add(partsDesc);
+    }
+    return descs;
+  }
+
+  private AddPartitionDesc partitionDesc(Path fromPath,
+      ImportTableDesc tblDesc, Partition partition) throws SemanticException {
+    try {
+      AddPartitionDesc partsDesc =
+          new AddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(),
+              EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
+              partition.getSd().getLocation(), partition.getParameters());
+      AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0);
+      partDesc.setInputFormat(partition.getSd().getInputFormat());
+      partDesc.setOutputFormat(partition.getSd().getOutputFormat());
+      partDesc.setNumBuckets(partition.getSd().getNumBuckets());
+      partDesc.setCols(partition.getSd().getCols());
+      partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib());
+      partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
+      partDesc.setBucketCols(partition.getSd().getBucketCols());
+      partDesc.setSortCols(partition.getSd().getSortCols());
+      partDesc.setLocation(new Path(fromPath,
+          Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+      partsDesc.setReplicationSpec(metadata.getReplicationSpec());
+      return partsDesc;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  @Override
+  public ReplicationSpec replicationSpec() {
+    return metadata.getReplicationSpec();
+  }
+
+  @Override
+  public EventType eventType() {
+    return EventType.Table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
new file mode 100644
index 0000000..bab64ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -0,0 +1,129 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+public class LoadDatabase {
+
+  final Context context;
+  final TaskTracker tracker;
+
+  private final DatabaseEvent event;
+  private final String dbNameToLoadIn;
+
+  public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
+      TaskTracker loadTaskTracker) {
+    this.context = context;
+    this.event = event;
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    this.tracker = new TaskTracker(loadTaskTracker);
+  }
+
+  public TaskTracker tasks() throws SemanticException {
+    try {
+      Database dbInMetadata = readDbMetadata();
+      Task<? extends Serializable> dbRootTask = existEmptyDb(dbInMetadata.getName())
+          ? alterDbTask(dbInMetadata, context.hiveConf)
+          : createDbTask(dbInMetadata);
+      tracker.addTask(dbRootTask);
+      return tracker;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  Database readDbMetadata() throws SemanticException {
+    return event.dbInMetadata(dbNameToLoadIn);
+  }
+
+  private Task<? extends Serializable> createDbTask(Database dbObj) {
+    CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+    createDbDesc.setName(dbObj.getName());
+    createDbDesc.setComment(dbObj.getDescription());
+
+    /*
+    explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going
+    to run multiple times and explicit logic is in place which prevents updates to tables when db level
+    last repl id is set and we create a AlterDatabaseTask at the end of processing a database.
+     */
+    Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
+    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    createDbDesc.setDatabaseProperties(parameters);
+    // note that we do not set location - for repl load, we want that auto-created.
+    createDbDesc.setIfNotExists(false);
+    // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+    // db.
+    // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+    DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), createDbDesc);
+    return TaskFactory.get(work, context.hiveConf);
+  }
+
+  private static Task<? extends Serializable> alterDbTask(Database dbObj, HiveConf hiveConf) {
+    AlterDatabaseDesc alterDbDesc =
+        new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null);
+    DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc);
+    return TaskFactory.get(work, hiveConf);
+  }
+
+  private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException {
+    Database db = context.hiveDb.getDatabase(dbName);
+    if (db == null) {
+      return false;
+    }
+    List<String> allTables = context.hiveDb.getAllTables(dbName);
+    List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*");
+    if (allTables.isEmpty() && allFunctions.isEmpty()) {
+      return true;
+    }
+    throw new InvalidOperationException(
+        "Database " + db.getName() + " is not empty. One or more tables/functions exist.");
+  }
+
+  public static class AlterDatabase extends LoadDatabase {
+
+    public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
+        TaskTracker loadTaskTracker) {
+      super(context, event, dbNameToLoadIn, loadTaskTracker);
+    }
+
+    @Override
+    public TaskTracker tasks() throws SemanticException {
+      tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf));
+      return tracker;
+    }
+  }
+}