You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/07/05 18:03:10 UTC

hive git commit: HIVE-16893: move replication dump related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master ae927155d -> c39b87958


HIVE-16893: move replication dump related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c39b8795
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c39b8795
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c39b8795

Branch: refs/heads/master
Commit: c39b879581325e7e9e4df0b8531f80e34ab11d91
Parents: ae92715
Author: Daniel Dai <da...@hortonworks.com>
Authored: Wed Jul 5 11:02:38 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Wed Jul 5 11:02:38 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/parse/TestReplicationScenarios.java |   3 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   3 +-
 ql/if/queryplan.thrift                          |   1 +
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |   8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |   3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |   5 +-
 ql/src/gen/thrift/gen-php/Types.php             |   2 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |   3 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |   5 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   4 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  | 347 +++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java  |  75 ++++
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |   4 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 305 +---------------
 .../hive/ql/parse/repl/dump/TableExport.java    |  32 +-
 .../parse/TestReplicationSemanticAnalyzer.java  |  71 ++--
 16 files changed, 507 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 0591dfd..fa68896 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil
 import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -159,7 +160,7 @@ public class TestReplicationScenarios {
   private static  int next = 0;
   private synchronized void advanceDumpDir() {
     next++;
-    ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+    ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
   }
 
  static class Tuple {

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index d6b97e8..1128eae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -131,7 +132,7 @@ class WarehouseInstance implements Closeable {
 
   private void advanceDumpDir() {
     next++;
-    ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+    ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
   }
 
   private ArrayList<String> lastResults;

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index cc9af94..dc55805 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -100,6 +100,7 @@ enum StageType {
   STATS,
   DEPENDENCY_COLLECTION,
   COLUMNSTATS,
+  REPLDUMP,
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index e92d776..7254d50 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -115,7 +115,8 @@ int _kStageTypeValues[] = {
   StageType::MOVE,
   StageType::STATS,
   StageType::DEPENDENCY_COLLECTION,
-  StageType::COLUMNSTATS
+  StageType::COLUMNSTATS,
+  StageType::REPLDUMP
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -129,9 +130,10 @@ const char* _kStageTypeNames[] = {
   "MOVE",
   "STATS",
   "DEPENDENCY_COLLECTION",
-  "COLUMNSTATS"
+  "COLUMNSTATS",
+  "REPLDUMP"
 };
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(12, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index ce37b2e..38d054b 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -93,7 +93,8 @@ struct StageType {
     MOVE = 8,
     STATS = 9,
     DEPENDENCY_COLLECTION = 10,
-    COLUMNSTATS = 11
+    COLUMNSTATS = 11,
+    REPLDUMP = 12
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index f20174c..deca574 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -23,7 +23,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   MOVE(8),
   STATS(9),
   DEPENDENCY_COLLECTION(10),
-  COLUMNSTATS(11);
+  COLUMNSTATS(11),
+  REPLDUMP(12);
 
   private final int value;
 
@@ -68,6 +69,8 @@ public enum StageType implements org.apache.thrift.TEnum {
         return DEPENDENCY_COLLECTION;
       case 11:
         return COLUMNSTATS;
+      case 12:
+        return REPLDUMP;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index e1693f3..4d902ee 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -114,6 +114,7 @@ final class StageType {
   const STATS = 9;
   const DEPENDENCY_COLLECTION = 10;
   const COLUMNSTATS = 11;
+  const REPLDUMP = 12;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -127,6 +128,7 @@ final class StageType {
     9 => 'STATS',
     10 => 'DEPENDENCY_COLLECTION',
     11 => 'COLUMNSTATS',
+    12 => 'REPLDUMP',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 2073959..9e29129 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -160,6 +160,7 @@ class StageType:
   STATS = 9
   DEPENDENCY_COLLECTION = 10
   COLUMNSTATS = 11
+  REPLDUMP = 12
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -174,6 +175,7 @@ class StageType:
     9: "STATS",
     10: "DEPENDENCY_COLLECTION",
     11: "COLUMNSTATS",
+    12: "REPLDUMP",
   }
 
   _NAMES_TO_VALUES = {
@@ -189,6 +191,7 @@ class StageType:
     "STATS": 9,
     "DEPENDENCY_COLLECTION": 10,
     "COLUMNSTATS": 11,
+    "REPLDUMP": 12,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index f8b4034..1433d4a 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -72,8 +72,9 @@ module StageType
   STATS = 9
   DEPENDENCY_COLLECTION = 10
   COLUMNSTATS = 11
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS]).freeze
+  REPLDUMP = 12
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index d61a460..94d6c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
@@ -108,7 +110,7 @@ public final class TaskFactory {
         IndexMetadataChangeTask.class));
     taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
     taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
-
+    taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
new file mode 100644
index 0000000..f9bdff8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -0,0 +1,347 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.collect.Collections2;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
+  private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
+  private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+
+  private final static String TMP_TABLE_PREFIX =
+      SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
+
+  private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
+  private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
+
+  @Override
+  public String getName() {
+    return "REPL_DUMP";
+  }
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    try {
+      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir());
+      DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
+      Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
+      Long lastReplId;
+      if (work.isBootStrapDump()) {
+        lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot);
+      } else {
+        lastReplId = incrementalDump(dumpRoot, dmd, cmRoot);
+      }
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
+    } catch (Exception e) {
+      LOG.error("failed", e);
+      return 1;
+    }
+    return 0;
+  }
+
+  private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+    LOG.debug("prepareReturnValues : " + schema);
+    for (String s : values) {
+      LOG.debug("    > " + s);
+    }
+    Utils.writeOutput(values, new Path(work.resultTempPath), conf);
+  }
+
+  private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
+    Long lastReplId;// get list of events matching dbPattern & tblPattern
+    // go through each event, and dump out each event to a event-level dump dir inside dumproot
+
+    // TODO : instead of simply restricting by message format, we should eventually
+    // move to a jdbc-driver-stype registering of message format, and picking message
+    // factory per event to decode. For now, however, since all messages have the
+    // same factory, restricting by message format is effectively a guard against
+    // older leftover data that would cause us problems.
+
+    work.overrideEventTo(getHive());
+
+    IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
+        new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
+        new EventBoundaryFilter(work.eventFrom, work.eventTo),
+        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
+
+    EventUtils.MSClientNotificationFetcher evFetcher
+        = new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+
+    EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+        evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
+
+    lastReplId = work.eventTo;
+    String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
+        ? work.dbNameOrPattern
+        : "?";
+    REPL_STATE_LOG
+        .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", dbName);
+    while (evIter.hasNext()) {
+      NotificationEvent ev = evIter.next();
+      lastReplId = ev.getEventId();
+      Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
+      dumpEvent(ev, evRoot, cmRoot);
+    }
+
+    REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName);
+
+    LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
+    Utils.writeOutput(
+        Arrays.asList(
+            "incremental",
+            String.valueOf(work.eventFrom),
+            String.valueOf(lastReplId)
+        ),
+        dmd.getDumpFilePath(), conf);
+    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
+    dmd.write();
+    return lastReplId;
+  }
+
+  private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
+    EventHandler.Context context = new EventHandler.Context(
+        evRoot,
+        cmRoot,
+        getHive(),
+        conf,
+        getNewEventOnlyReplicationSpec(ev.getEventId())
+    );
+    EventHandlerFactory.handlerFor(ev).handle(context);
+    REPL_STATE_LOG.info(
+        "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
+        String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
+  }
+
+  private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
+    return getNewReplicationSpec(eventId.toString(), eventId.toString());
+  }
+
+  private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
+    // bootstrap case
+    Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+    for (String dbName : matchesDb()) {
+      REPL_STATE_LOG
+          .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP",
+              dbName);
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+
+      Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+      dumpFunctionMetadata(dbName, dumpRoot);
+      for (String tblName : matchesTbl(dbName, work.tableNameOrPattern)) {
+        LOG.debug(
+            "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
+        dumpTable(dbName, tblName, dbRoot);
+      }
+    }
+    Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+    LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
+        bootDumpEndReplId);
+
+    // Now that bootstrap has dumped all objects related, we have to account for the changes
+    // that occurred while bootstrap was happening - i.e. we have to look through all events
+    // during the bootstrap period and consolidate them with our dump.
+
+    IMetaStoreClient.NotificationFilter evFilter =
+        new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern);
+    EventUtils.MSClientNotificationFetcher evFetcher =
+        new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+    EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+        evFetcher, bootDumpBeginReplId,
+        Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
+        evFilter);
+
+    // Now we consolidate all the events that happenned during the objdump into the objdump
+    while (evIter.hasNext()) {
+      NotificationEvent ev = evIter.next();
+      Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+      // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
+    }
+    LOG.info(
+        "Consolidation done, preparing to return {},{}->{}",
+        dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+    dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
+    dmd.write();
+
+    // Set the correct last repl id to return to the user
+    return bootDumpEndReplId;
+  }
+
+  private Iterable<? extends String> matchesDb() throws HiveException {
+    if (work.dbNameOrPattern == null) {
+      return getHive().getAllDatabases();
+    } else {
+      return getHive().getDatabasesByPattern(work.dbNameOrPattern);
+    }
+  }
+
+  private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+      throws HiveException {
+    Hive db = Hive.get();
+    if (tblPattern == null) {
+      return Collections2.filter(db.getAllTables(dbName),
+          tableName -> {
+            assert tableName != null;
+            return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
+          });
+    } else {
+      return db.getTablesByPattern(dbName, tblPattern);
+    }
+  }
+
+  private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
+    Path dbRoot = new Path(dumpRoot, dbName);
+    // TODO : instantiating FS objects are generally costly. Refactor
+    FileSystem fs = dbRoot.getFileSystem(conf);
+    Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+    HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database();
+    EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
+    REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
+    return dbRoot;
+  }
+
+  private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception {
+    try {
+      Hive db = getHive();
+      TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+      TableExport.Paths exportPaths =
+          new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf);
+      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
+      REPL_STATE_LOG.info(
+          "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}",
+          dbName, tblName, exportPaths.exportRootDir.toString());
+    } catch (InvalidTableException te) {
+      // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
+      // Just log a debug message and skip it.
+      LOG.debug(te.getMessage());
+    }
+  }
+
+  private ReplicationSpec getNewReplicationSpec() throws TException {
+    ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
+    rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC()
+        .getCurrentNotificationEventId().getEventId()));
+    return rspec;
+  }
+
+  private ReplicationSpec getNewReplicationSpec(String evState, String objState) {
+    return new ReplicationSpec(true, false, evState, objState, false, true, true);
+  }
+
+  private String getNextDumpDir() {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // make it easy to write .q unit tests, instead of unique id generation.
+      // however, this does mean that in writing tests, we have to be aware that
+      // repl dump will clash with prior dumps, and thus have to clean up properly.
+      if (work.testInjectDumpDir == null) {
+        return "next";
+      } else {
+        return work.testInjectDumpDir;
+      }
+    } else {
+      return String.valueOf(System.currentTimeMillis());
+      // TODO: time good enough for now - we'll likely improve this.
+      // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+      // uniqueness.
+    }
+  }
+
+  private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception {
+    Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
+    List<String> functionNames = getHive().getFunctions(dbName, "*");
+    for (String functionName : functionNames) {
+      HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName);
+      if (tuple == null) {
+        continue;
+      }
+      Path functionRoot = new Path(functionsRoot, functionName);
+      Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
+      try (JsonWriter jsonWriter =
+          new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) {
+        FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
+        serializer.writeTo(jsonWriter, tuple.replicationSpec);
+      }
+      REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
+    }
+  }
+
+  private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName) {
+    try {
+      HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), dbName).function(functionName);
+      if (tuple.object.getResourceUris().isEmpty()) {
+        REPL_STATE_LOG.warn(
+            "Not replicating function: " + functionName + " as it seems to have been created "
+                + "without USING clause");
+        return null;
+      }
+      return tuple;
+    } catch (HiveException e) {
+      //This can happen as we are querying the getFunctions before we are getting the actual function
+      //in between there can be a drop function by a user in which case our call will fail.
+      LOG.info("Function " + functionName
+          + " could not be found, we are ignoring it as it can be a valid state ", e);
+      return null;
+    }
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.REPLDUMP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
new file mode 100644
index 0000000..1f32be9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -0,0 +1,75 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class ReplDumpWork implements Serializable {
+  final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
+  final Long eventFrom;
+  Long eventTo;
+  private Integer maxEventLimit;
+  static String testInjectDumpDir = null;
+
+  public static void injectNextDumpDirForTest(String dumpDir) {
+    testInjectDumpDir = dumpDir;
+  }
+
+  public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern,
+      Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
+      String resultTempPath) {
+    this.dbNameOrPattern = dbNameOrPattern;
+    this.tableNameOrPattern = tableNameOrPattern;
+    this.eventFrom = eventFrom;
+    this.eventTo = eventTo;
+    this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+    this.maxEventLimit = maxEventLimit;
+    this.resultTempPath = resultTempPath;
+  }
+
+  boolean isBootStrapDump() {
+    return eventFrom == null;
+  }
+
+  int maxEventLimit() throws Exception {
+    if (eventTo < eventFrom) {
+      throw new Exception("Invalid event ID input received in TO clause");
+    }
+    Integer maxRange = Ints.checkedCast(this.eventTo - eventFrom + 1);
+    if ((maxEventLimit == null) || (maxEventLimit > maxRange)) {
+      maxEventLimit = maxRange;
+    }
+    return maxEventLimit;
+  }
+
+  void overrideEventTo(Hive fromDb) throws Exception {
+    if (eventTo == null) {
+      eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId();
+      LoggerFactory.getLogger(this.getClass())
+          .debug("eventTo not specified, using current event id : {}", eventTo);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index fdf6c3c..0932dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -70,7 +71,8 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
     // initialize export path
     String tmpPath = stripQuotes(toTree.getText());
     // All parsing is done, we're now good to start the export process.
-    TableExport.Paths exportPaths = new TableExport.Paths(ast, tmpPath, conf);
+    TableExport.Paths exportPaths =
+        new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf);
     TableExport.AuthEntities authEntities =
         new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run();
     inputs.addAll(authEntities.inputs);

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 3d0c736..48d9c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,40 +19,22 @@ package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
-import com.google.common.primitives.Ints;
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
-import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
-import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
@@ -101,7 +83,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
   private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
-  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
   private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
 
   ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
@@ -180,117 +161,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
         + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
         + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit));
-    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
-    Path dumpRoot = new Path(replRoot, getNextDumpDir());
-    DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
-    Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
-    Long lastReplId;
     try {
-      if (eventFrom == null){
-        // bootstrap case
-        Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
-        for (String dbName : matchesDb(dbNameOrPattern)) {
-          REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName);
-          LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
-
-          Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
-          dumpFunctionMetadata(dbName, dumpRoot);
-          for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
-            LOG.debug(
-                "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
-            dumpTable(ast, dbName, tblName, dbRoot);
-          }
-          REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " +
-                              "metadata and data",
-                              dbName, rootTasks.size());
-        }
-        Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
-        LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId);
-
-        // Now that bootstrap has dumped all objects related, we have to account for the changes
-        // that occurred while bootstrap was happening - i.e. we have to look through all events
-        // during the bootstrap period and consolidate them with our dump.
-
-        IMetaStoreClient.NotificationFilter evFilter =
-            new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern);
-        EventUtils.MSClientNotificationFetcher evFetcher =
-            new EventUtils.MSClientNotificationFetcher(db.getMSC());
-        EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
-            evFetcher, bootDumpBeginReplId,
-            Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
-            evFilter );
-
-        // Now we consolidate all the events that happenned during the objdump into the objdump
-        while (evIter.hasNext()){
-          NotificationEvent ev = evIter.next();
-          Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
-          // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
-        }
-        LOG.info(
-            "Consolidation done, preparing to return {},{}->{}",
-            dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
-        dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
-        dmd.write();
-
-        // Set the correct last repl id to return to the user
-        lastReplId = bootDumpEndReplId;
-      } else {
-        // get list of events matching dbPattern & tblPattern
-        // go through each event, and dump out each event to a event-level dump dir inside dumproot
-        if (eventTo == null){
-          eventTo = db.getMSC().getCurrentNotificationEventId().getEventId();
-          LOG.debug("eventTo not specified, using current event id : {}", eventTo);
-        } else if (eventTo < eventFrom) {
-          throw new Exception("Invalid event ID input received in TO clause");
-        }
-
-        Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
-        if ((maxEventLimit == null) || (maxEventLimit > maxRange)){
-          maxEventLimit = maxRange;
-        }
-
-        // TODO : instead of simply restricting by message format, we should eventually
-        // move to a jdbc-driver-stype registering of message format, and picking message
-        // factory per event to decode. For now, however, since all messages have the
-        // same factory, restricting by message format is effectively a guard against
-        // older leftover data that would cause us problems.
-
-        IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
-            new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern),
-            new EventBoundaryFilter(eventFrom, eventTo),
-            new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
-
-        EventUtils.MSClientNotificationFetcher evFetcher
-            = new EventUtils.MSClientNotificationFetcher(db.getMSC());
-
-        EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
-            evFetcher, eventFrom, maxEventLimit, evFilter);
-
-        lastReplId = eventTo;
-        REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL",
-                            (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
-        while (evIter.hasNext()){
-          NotificationEvent ev = evIter.next();
-          lastReplId = ev.getEventId();
-          Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
-          dumpEvent(ev, evRoot, cmRoot);
-        }
-
-        REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}",
-                            (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
-
-        LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
-        Utils.writeOutput(
-            Arrays.asList(
-                "incremental",
-                String.valueOf(eventFrom),
-                String.valueOf(lastReplId)
-            ),
-            dmd.getDumpFilePath(), conf);
-        dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot);
-        dmd.write();
-      }
-      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
+      ctx.setResFile(ctx.getLocalTmpPath());
+      Task<ReplDumpWork> replDumpWorkTask = TaskFactory
+          .get(new ReplDumpWork(
+              dbNameOrPattern,
+              tblNameOrPattern,
+              eventFrom,
+              eventTo,
+              ErrorMsg.INVALID_PATH.getMsg(ast),
+              maxEventLimit,
+              ctx.getResFile().toUri().toString()
+          ), conf);
+      rootTasks.add(replDumpWorkTask);
       setFetchTask(createFetchTask(dumpSchema));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
@@ -299,119 +182,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
-    EventHandler.Context context = new EventHandler.Context(
-        evRoot,
-        cmRoot,
-        db,
-        conf,
-        getNewEventOnlyReplicationSpec(ev.getEventId())
-    );
-    EventHandlerFactory.handlerFor(ev).handle(context);
-    REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
-                        String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
-  }
-
-  public static void injectNextDumpDirForTest(String dumpdir){
-    testInjectDumpDir = dumpdir;
-  }
-
-  private String getNextDumpDir() {
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
-      // make it easy to write .q unit tests, instead of unique id generation.
-      // however, this does mean that in writing tests, we have to be aware that
-      // repl dump will clash with prior dumps, and thus have to clean up properly.
-      if (testInjectDumpDir == null){
-        return "next";
-      } else {
-        return testInjectDumpDir;
-      }
-    } else {
-      return String.valueOf(System.currentTimeMillis());
-      // TODO: time good enough for now - we'll likely improve this.
-      // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
-      // uniqueness.
-    }
-  }
-
-  /**
-   *
-   * @param dbName
-   * @param dumpRoot
-   * @return db dumped path
-   * @throws SemanticException
-   */
-  private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
-    Path dbRoot = new Path(dumpRoot, dbName);
-    try {
-      // TODO : instantiating FS objects are generally costly. Refactor
-      FileSystem fs = dbRoot.getFileSystem(conf);
-      Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
-      HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
-      inputs.add(new ReadEntity(database.object));
-      EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
-      REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
-    } catch (Exception e) {
-      // TODO : simple wrap & rethrow for now, clean up with error codes
-      throw new SemanticException(e);
-    }
-    return dbRoot;
-  }
-
-  private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException {
-    Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
-    try {
-      List<String> functionNames = db.getFunctions(dbName, "*");
-      for (String functionName : functionNames) {
-        HiveWrapper.Tuple<Function> tuple;
-        try {
-          tuple = new HiveWrapper(db, dbName).function(functionName);
-        } catch (HiveException e) {
-          //This can happen as we are querying the getFunctions before we are getting the actual function
-          //in between there can be a drop function by a user in which case our call will fail.
-          LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e);
-          continue;
-        }
-        if (tuple.object.getResourceUris().isEmpty()) {
-          REPL_STATE_LOG.warn(
-              "Not replicating function: " + functionName + " as it seems to have been created "
-                  + "without USING clause");
-          continue;
-        }
-
-        Path functionRoot = new Path(functionsRoot, functionName);
-        Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
-        try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf),
-            functionMetadataRoot)) {
-          FunctionSerializer serializer =
-              new FunctionSerializer(tuple.object, conf);
-          serializer.writeTo(jsonWriter, tuple.replicationSpec);
-        }
-        REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
-      }
-    } catch (Exception e) {
-      throw new SemanticException(e);
-    }
-  }
-
-  private void dumpTable(ASTNode ast, String dbName, String tblName, Path dbRoot)
-      throws SemanticException {
-    try {
-      TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
-      TableExport.Paths exportPaths = new TableExport.Paths(ast, dbRoot, tblName, conf);
-      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
-      REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " +
-          "and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString());
-    } catch (InvalidTableException te) {
-      // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
-      // Just log a debug message and skip it.
-      LOG.debug(te.getMessage());
-    } catch (HiveException e) {
-      // TODO : simple wrap & rethrow for now, clean up with error codes
-      throw new SemanticException(e);
-    }
-  }
-
   // REPL LOAD
   private void initReplLoad(ASTNode ast) {
     int numChildren = ast.getChildCount();
@@ -953,53 +723,4 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     ctx.setResFile(ctx.getLocalTmpPath());
     Utils.writeOutput(values, ctx.getResFile(), conf);
   }
-
-  private ReplicationSpec getNewReplicationSpec() throws SemanticException {
-    try {
-      ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
-      rspec.setCurrentReplicationState(String.valueOf(db.getMSC()
-          .getCurrentNotificationEventId().getEventId()));
-      return rspec;
-    } catch (Exception e) {
-      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes
-    }
-  }
-
-  // Use for specifying object state as well as event state
-  private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
-    return new ReplicationSpec(true, false, evState, objState, false, true, true);
-  }
-
-  // Use for replication states focused on event only, where the obj state will be the event state
-  private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
-    return getNewReplicationSpec(eventId.toString(), eventId.toString());
-  }
-
-  private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
-      throws HiveException {
-    if (tblPattern == null) {
-      return removeValuesTemporaryTables(db.getAllTables(dbName));
-    } else {
-      return db.getTablesByPattern(dbName, tblPattern);
-    }
-  }
-
-  private final static String TMP_TABLE_PREFIX =
-      SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
-
-  static Iterable<String> removeValuesTemporaryTables(List<String> tableNames) {
-    return Collections2.filter(tableNames,
-        tableName -> {
-          assert tableName != null;
-          return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
-        });
-  }
-
-  private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
-    if (dbPattern == null) {
-      return db.getAllDatabases();
-    } else {
-      return db.getDatabasesByPattern(dbPattern);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 144d667..9f22f23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -62,6 +61,9 @@ public class TableExport {
         ? null
         : tableSpec;
     this.replicationSpec = replicationSpec;
+    if (this.tableSpec != null && this.tableSpec.tableHandle.isView()) {
+      this.replicationSpec.setIsMetadataOnly(true);
+    }
     this.db = db;
     this.conf = conf;
     this.logger = logger;
@@ -72,6 +74,7 @@ public class TableExport {
     if (tableSpec == null) {
       writeMetaData(null);
     } else if (shouldExport()) {
+      //first we should get the correct replication spec before doing metadata/data export
       if (tableSpec.tableHandle.isView()) {
         replicationSpec.setIsMetadataOnly(true);
       }
@@ -111,7 +114,8 @@ public class TableExport {
     }
   }
 
-  private void writeMetaData(PartitionIterable partitions) throws SemanticException {
+  private void writeMetaData(PartitionIterable partitions)
+      throws SemanticException {
     try {
       EximUtil.createExportDump(
           paths.exportFileSystem,
@@ -168,13 +172,14 @@ public class TableExport {
    * directory creation.
    */
   public static class Paths {
-    private final ASTNode ast;
+    private final String astRepresentationForErrorMsg;
     private final HiveConf conf;
     public final Path exportRootDir;
     private final FileSystem exportFileSystem;
 
-    public Paths(ASTNode ast, Path dbRoot, String tblName, HiveConf conf) throws SemanticException {
-      this.ast = ast;
+    public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName,
+        HiveConf conf) throws SemanticException {
+      this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
       this.conf = conf;
       Path tableRoot = new Path(dbRoot, tblName);
       URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
@@ -190,8 +195,9 @@ public class TableExport {
       }
     }
 
-    public Paths(ASTNode ast, String path, HiveConf conf) throws SemanticException {
-      this.ast = ast;
+    public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf)
+        throws SemanticException {
+      this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
       this.conf = conf;
       this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
       try {
@@ -245,21 +251,21 @@ public class TableExport {
           FileStatus tgt = fs.getFileStatus(toPath);
           // target exists
           if (!tgt.isDirectory()) {
-            throw new SemanticException(ErrorMsg.INVALID_PATH
-                .getMsg(ast, "Target is not a directory : " + rootDirExportFile));
+            throw new SemanticException(
+                astRepresentationForErrorMsg + ": " + "Target is not a directory : "
+                    + rootDirExportFile);
           } else {
             FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
             if (files != null && files.length != 0) {
               throw new SemanticException(
-                  ErrorMsg.INVALID_PATH
-                      .getMsg(ast, "Target is not an empty directory : " + rootDirExportFile)
-              );
+                  astRepresentationForErrorMsg + ": " + "Target is not an empty directory : "
+                      + rootDirExportFile);
             }
           }
         } catch (FileNotFoundException ignored) {
         }
       } catch (IOException e) {
-        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
+        throw new SemanticException(astRepresentationForErrorMsg, e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
index 1cb4470..17cf4d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -1,33 +1,28 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.*;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,14 +40,15 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 public class TestReplicationSemanticAnalyzer {
-  static QueryState queryState;
+  private static QueryState queryState;
   static HiveConf conf;
-  static String defaultDB = "default";
-  static String tblName = "testReplSA";
-  static ArrayList<String> cols =  new ArrayList<String>(Arrays.asList("col1", "col2"));
-  ParseDriver pd;
-  SemanticAnalyzer sA;
+  private static String defaultDB = "default";
+  private static String tblName = "testReplSA";
+  private static ArrayList<String> cols =  new ArrayList<String>(Arrays.asList("col1", "col2"));
 
   @BeforeClass
   public static void initialize() throws HiveException {
@@ -221,12 +217,6 @@ public class TestReplicationSemanticAnalyzer {
     assertEquals(child.getChildCount(), 0);
   }
 
-  // TODO: add this test after repl dump analyze generates tasks
-  //@Test
-  public void testReplDumpAnalyze() throws Exception {
-
-  }
-
   //@Test
   public void testReplLoadAnalyze() throws Exception {
     ParseDriver pd = new ParseDriver();
@@ -274,19 +264,4 @@ public class TestReplicationSemanticAnalyzer {
     FetchTask fetchTask = rs.getFetchTask();
     assertNotNull(fetchTask);
   }
-
-  @Test
-  public void removeTemporaryTablesForMetadataDump() {
-    List<String> validTables = ImmutableList.copyOf(
-        ReplicationSemanticAnalyzer.removeValuesTemporaryTables(new ArrayList<String>() {{
-          add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "a");
-          add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "b");
-          add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "c");
-          add("c");
-          add("b");
-          add("a");
-        }}));
-    assertThat(validTables.size(), is(equalTo(3)));
-    assertThat(validTables, hasItems("a", "b", "c"));
-  }
 }