You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/11/14 04:15:08 UTC

[1/2] hive git commit: HIVE-15151: Bootstrap support for replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)

Repository: hive
Updated Branches:
  refs/heads/master 532e6a72e -> 739ac3af3


http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..a4dfa3a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+  // Database name or pattern
+  private String dbNameOrPattern;
+  // Table name or pattern
+  private String tblNameOrPattern;
+  private Integer eventFrom;
+  private Integer eventTo;
+  private Integer batchSize;
+  // Base path for REPL LOAD
+  private String path;
+
+  public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+    super(queryState);
+  }
+
+  @Override
+  public void analyzeInternal(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal");
+    LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + ast.getText());
+    switch (ast.getToken().getType()) {
+    case TOK_REPL_DUMP: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
+      initReplDump(ast);
+      analyzeReplDump(ast);
+    }
+    case TOK_REPL_LOAD: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
+      initReplLoad(ast);
+      analyzeReplLoad(ast);
+    }
+    case TOK_REPL_STATUS: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
+      initReplStatus(ast);
+      analyzeReplStatus(ast);
+    }
+    default: {
+      throw new SemanticException("Unexpected root token");
+    }
+    }
+  }
+
+  private void initReplDump(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    // skip the first node, which is always required
+    int currNode = 1;
+    while (currNode < numChildren) {
+      if (ast.getChild(currNode).getType() != TOK_FROM) {
+        // optional tblName was specified.
+        tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText());
+      } else {
+        // TOK_FROM subtree
+        Tree fromNode = ast.getChild(currNode);
+        eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+        // skip the first, which is always required
+        int numChild = 1;
+        while (numChild < fromNode.getChildCount()) {
+          if (fromNode.getChild(numChild).getType() == TOK_TO) {
+            eventTo =
+                Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+            // skip the next child, since we already took care of it
+            numChild++;
+          } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
+            batchSize =
+                Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+            // skip the next child, since we already took care of it
+            numChild++;
+          }
+          // move to the next child in FROM tree
+          numChild++;
+        }
+        // FROM node is always the last
+        break;
+      }
+      // move to the next root node
+      currNode++;
+    }
+  }
+
+  // REPL DUMP
+  private void analyzeReplDump(ASTNode ast) throws SemanticException {
+    // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+        + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+        + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    Path dumpRoot = new Path(replRoot, getNextDumpDir());
+    try {
+      for (String dbName : matchesDb(dbNameOrPattern)) {
+        LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+        Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+        for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+          LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+              + " to db root " + dbRoot.toUri());
+          dumpTbl(ast, dbName, tblName, dbRoot);
+        }
+      }
+      String currentReplId =
+          String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
+          "dump_dir,last_repl_id#string,string");
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+  }
+
+  String getNextDumpDir() {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      return "next";
+      // make it easy to write unit tests, instead of unique id generation.
+      // however, this does mean that in writing tests, we have to be aware that
+      // repl dump will clash with prior dumps, and thus have to clean up properly.
+    } else {
+      return String.valueOf(System.currentTimeMillis());
+      // TODO: time good enough for now - we'll likely improve this.
+      // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+      // uniqueness.
+    }
+  }
+
+  /**
+   *
+   * @param dbName
+   * @param dumpRoot
+   * @return db dumped path
+   * @throws SemanticException
+   */
+  private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
+    Path dbRoot = new Path(dumpRoot, dbName);
+    try {
+      // TODO : instantiating FS objects are generally costly. Refactor
+      FileSystem fs = dbRoot.getFileSystem(conf);
+      Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+      Database dbObj = db.getDatabase(dbName);
+      EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+    return dbRoot;
+  }
+
+  /**
+   *
+   * @param ast
+   * @param dbName
+   * @param tblName
+   * @param dbRoot
+   * @return tbl dumped path
+   * @throws SemanticException
+   */
+  private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException {
+    Path tableRoot = new Path(dbRoot, tblName);
+    try {
+      URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
+      TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+      ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
+          rootTasks, inputs, outputs, LOG);
+    } catch (HiveException e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+    return tableRoot;
+  }
+
+  // REPL LOAD
+  private void initReplLoad(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    if (numChildren > 1) {
+      dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+    }
+    if (numChildren > 2) {
+      tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText());
+    }
+  }
+
+  /*
+   * Example dump dirs we need to be able to handle :
+   *
+   * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+   *
+   * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
+   * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+   *
+   * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+   *
+   * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+   */
+  private void analyzeReplLoad(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
+        + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path));
+
+    // for analyze repl load, we walk through the dir structure available in the path,
+    // looking at each db, and then each table, and then setting up the appropriate
+    // import job in its place.
+
+    // FIXME : handle non-bootstrap cases.
+
+    // We look at the path, and go through each subdir.
+    // Each subdir corresponds to a database.
+    // For each subdir, there is a _metadata file which allows us to re-impress the db object
+    // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
+    // that we had an IMPORT on each of them, into this db.
+
+    try {
+
+      Path loadPath = new Path(path);
+      final FileSystem fs = loadPath.getFileSystem(conf);
+
+      if (!fs.exists(loadPath)) {
+        // supposed dump path does not exist.
+        throw new FileNotFoundException(loadPath.toUri().toString());
+      }
+
+      // Now, the dumped path can be one of two things:
+      // a) It can be a db dump, in which case we expect a set of dirs, each with a
+      // db name, and with a _metadata file in each, and table dirs inside that.
+      // b) It can be a table dump dir, in which case we expect a _metadata dump of
+      // a table in question in the dir, and individual ptn dir hierarchy.
+      // Once we expand this into doing incremental repl, we can have individual events which can
+      // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
+      // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
+      // if dbname is specified, we expect exactly one db dumped, and having more is an error
+      // condition.
+
+      if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+        analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+        return;
+      }
+
+      FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
+      if (srcs == null || (srcs.length == 0)) {
+        throw new FileNotFoundException(loadPath.toUri().toString());
+      }
+
+      FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
+
+      if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
+        throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
+      }
+
+      if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+        LOG.debug("Found multiple dirs when we expected 1:");
+        for (FileStatus d : dirsInLoadPath) {
+          LOG.debug("> " + d.getPath().toUri().toString());
+        }
+        throw new IllegalArgumentException(
+            "Multiple dirs in "
+                + loadPath.toUri().toString()
+                + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+      }
+
+      for (FileStatus dir : dirsInLoadPath) {
+        analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+      }
+
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+
+  }
+
+  private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
+      throws SemanticException {
+    try {
+      // Path being passed to us is a db dump location. We go ahead and load as needed.
+      // dbName might be null or empty, in which case we keep the original db name for the new
+      // database creation
+
+      // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc
+      // associated with that
+      // Then, we iterate over all subdirs, and create table imports for each.
+
+      EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+      try {
+        rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
+      } catch (IOException e) {
+        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+      }
+
+      Database dbObj = rv.getDatabase();
+
+      if (dbObj == null) {
+        throw new IllegalArgumentException(
+            "_metadata file read did not contain a db object - invalid dump.");
+      }
+
+      if ((dbName == null) || (dbName.isEmpty())) {
+        // We use dbName specified as long as it is not null/empty. If so, then we use the original
+        // name
+        // recorded in the thrift object.
+        dbName = dbObj.getName();
+      }
+
+      CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+      createDbDesc.setName(dbName);
+      createDbDesc.setComment(dbObj.getDescription());
+      createDbDesc.setDatabaseProperties(dbObj.getParameters());
+      // note that we do not set location - for repl load, we want that auto-created.
+
+      createDbDesc.setIfNotExists(false);
+      // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+      // db.
+      // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+      Task<? extends Serializable> createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf);
+      rootTasks.add(createDbTask);
+
+      FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
+
+      for (FileStatus tableDir : dirsInDbPath) {
+        analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+      }
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private void analyzeTableLoad(String dbName, String tblName, String locn,
+      Task<? extends Serializable> precursor) throws SemanticException {
+    // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+    // If tblName is null, then we default to the table name specified in _metadata, which is good.
+    // or are both specified, in which case, that's what we are intended to create the new table as.
+    if (dbName == null || dbName.isEmpty()) {
+      throw new SemanticException("Database name cannot be null for a table load");
+    }
+    try {
+      // no location set on repl loads
+      boolean isLocationSet = false;
+      // all repl imports are non-external
+      boolean isExternalSet = false;
+      // bootstrap loads are not partition level
+      boolean isPartSpecSet = false;
+      // repl loads are not partition level
+      LinkedHashMap<String, String> parsedPartSpec = null;
+      // no location for repl imports
+      String parsedLocation = null;
+      boolean waitOnCreateDb = false;
+      List<Task<? extends Serializable>> importTasks = null;
+      if (precursor == null) {
+        importTasks = rootTasks;
+        waitOnCreateDb = false;
+      } else {
+        importTasks = new ArrayList<Task<? extends Serializable>>();
+        waitOnCreateDb = true;
+      }
+      EximUtil.SemanticAnalyzerWrapperContext x =
+          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
+              ctx);
+      ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+          waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+
+      if (precursor != null) {
+        for (Task<? extends Serializable> t : importTasks) {
+          precursor.addDependentTask(t);
+        }
+      }
+
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  // REPL STATUS
+  private void initReplStatus(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    if (numChildren > 1) {
+      tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+    }
+  }
+
+  private void analyzeReplStatus(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern)
+        + "." + String.valueOf(tblNameOrPattern));
+
+    String replLastId = null;
+
+    try {
+      if (tblNameOrPattern != null) {
+        // Checking for status of table
+        Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern);
+        if (tbl != null) {
+          inputs.add(new ReadEntity(tbl));
+          Map<String, String> params = tbl.getParameters();
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          }
+        }
+      } else {
+        // Checking for status of a db
+        Database database = db.getDatabase(dbNameOrPattern);
+        if (database != null) {
+          inputs.add(new ReadEntity(database));
+          Map<String, String> params = database.getParameters();
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          }
+        }
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    }
+
+    LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
+        + ctx.getResFile());
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+  }
+
+  private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+    LOG.debug("prepareReturnValues : " + schema);
+    for (String s : values) {
+      LOG.debug("    > " + s);
+    }
+
+    ctx.setResFile(ctx.getLocalTmpPath());
+    // FIXME : this should not accessible by the user if we write to it from the frontend.
+    // Thus, we should Desc/Work this, otherwise there is a security issue here.
+    // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
+    // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
+    // this is a partitioned dir, which does not work. Thus, this does not work.
+
+    writeOutput(values);
+  }
+
+  private void writeOutput(List<String> values) throws SemanticException {
+    Path outputFile = ctx.getResFile();
+    FileSystem fs = null;
+    DataOutputStream outStream = null;
+    try {
+      fs = outputFile.getFileSystem(conf);
+      outStream = fs.create(outputFile);
+      outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+      for (int i = 1; i < values.size(); i++) {
+        outStream.write(Utilities.ctrlaCode);
+        outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+      }
+      outStream.write(Utilities.newLineCode);
+    } catch (IOException e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    } finally {
+      IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask -
+                                      // replace with this
+    }
+  }
+
+  private ReplicationSpec getNewReplicationSpec() throws SemanticException {
+    try {
+      ReplicationSpec replicationSpec =
+          new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
+      replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+          .getCurrentNotificationEventId().getEventId()));
+      return replicationSpec;
+    } catch (Exception e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    }
+  }
+
+  private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+      throws HiveException {
+    if (tblPattern == null) {
+      return db.getAllTables(dbName);
+    } else {
+      return db.getTablesByPattern(dbName, tblPattern);
+    }
+  }
+
+  private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
+    if (dbPattern == null) {
+      return db.getAllDatabases();
+    } else {
+      return db.getDatabasesByPattern(dbPattern);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4668271..824cf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -42,6 +42,7 @@ public class ReplicationSpec {
   private String eventId = null;
   private String currStateId = null;
   private boolean isNoop = false;
+  private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
 
 
   // Key definitions related to replication
@@ -49,8 +50,9 @@ public class ReplicationSpec {
     REPL_SCOPE("repl.scope"),
     EVENT_ID("repl.event.id"),
     CURR_STATE_ID("repl.last.id"),
-    NOOP("repl.noop");
-
+    NOOP("repl.noop"),
+    LAZY("repl.lazy"),
+    ;
     private final String keyName;
 
     KEY(String s) {
@@ -102,32 +104,32 @@ public class ReplicationSpec {
     this((ASTNode)null);
   }
 
-  public  ReplicationSpec(
-      boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState,
-      String currentReplicationState, boolean isNoop){
+  public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+      String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) {
     this.isInReplicationScope = isInReplicationScope;
     this.isMetadataOnly = isMetadataOnly;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
     this.isNoop = isNoop;
+    this.isLazy = isLazy;
   }
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
     String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
     this.isMetadataOnly = false;
     this.isInReplicationScope = false;
-    if (scope != null){
-      if (scope.equalsIgnoreCase("metadata")){
+    if (scope != null) {
+      if (scope.equalsIgnoreCase("metadata")) {
         this.isMetadataOnly = true;
         this.isInReplicationScope = true;
-      } else if (scope.equalsIgnoreCase("all")){
+      } else if (scope.equalsIgnoreCase("all")) {
         this.isInReplicationScope = true;
       }
     }
     this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
     this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-    this.isNoop = Boolean.parseBoolean(
-        keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+    this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+    this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
   }
 
   /**
@@ -280,6 +282,21 @@ public class ReplicationSpec {
     this.isNoop = isNoop;
   }
 
+  /**
+   * @return whether or not the current replication action is set to be lazy
+   */
+  public boolean isLazy() {
+    return isLazy;
+  }
+
+  /**
+   * @param isLazy whether or not the current replication action should be lazy
+   */
+  public void setLazy(boolean isLazy){
+    this.isLazy = isLazy;
+  }
+
+
   public String get(KEY key) {
     switch (key){
       case REPL_SCOPE:
@@ -297,6 +314,8 @@ public class ReplicationSpec {
         return getCurrentReplicationState();
       case NOOP:
         return String.valueOf(isNoop());
+      case LAZY:
+        return String.valueOf(isLazy());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index ed01a31..520d3de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -128,6 +128,10 @@ public final class SemanticAnalyzerFactory {
     commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT);
     commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK);
     commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT);
+    commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
+    commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
+    commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
+
   }
 
   static {
@@ -185,6 +189,12 @@ public final class SemanticAnalyzerFactory {
         return new ExportSemanticAnalyzer(queryState);
       case HiveParser.TOK_IMPORT:
         return new ImportSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_DUMP:
+        return new ReplicationSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_LOAD:
+        return new ReplicationSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_STATUS:
+        return new ReplicationSemanticAnalyzer(queryState);
       case HiveParser.TOK_ALTERTABLE: {
         Tree child = tree.getChild(1);
         switch (child.getType()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
new file mode 100644
index 0000000..1932d60
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+/**
+ * Marker work for Replication - behaves similar to CopyWork, but maps to ReplCopyTask,
+ * which will have mechanics to list the files in source to write to the destination,
+ * instead of copying them, if specified, falling back to copying if needed.
+ */
+@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplCopyWork extends CopyWork {
+
+  protected boolean copyFiles = true; // governs copy-or-list-files behaviour.
+  // If set to true, behaves identically to a CopyWork
+  // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files.
+  // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this.
+
+  /**
+   * TODO : Refactor
+   *
+   * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following:
+   *
+   * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy
+   * along data from the source to destination. If, however, listFilesOnOutput is set,
+   * then, instead of copying the individual files to the destination, it simply creates
+   * a file called _files on destination that contains the list of the original files
+   * that were intended to be copied. Thus, we do not actually copy the files at CopyWork
+   * time.
+   *
+   * The flip side of this behaviour happens when, instead, readListFromInput is set. This
+   * flag, if set, changes the source behaviour of this CopyTask, and instead of copying
+   * explicit files, this will then fall back to a behaviour wherein an _files is read from
+   * the source, and the files specified by the _files are then copied to the destination.
+   *
+   * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want
+   * to use from replication.
+   *
+   * ==
+   *
+   * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set,
+   * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput,
+   * and will generate a _files file.
+   *
+   * As to the input, we simply decide on whether to use the lazy mode or not depending on the
+   * presence of a _files file on the input. If we see a _files on the input, we simply expand it
+   * to copy as needed. If we do not, we copy as normal.
+   *
+   */
+
+  protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour
+  // If set to true, it'll iterate over input files, and for each file in the input,
+  //   it'll write out an additional line in a _files file in the output.
+  // If set to false, it'll behave as a traditional CopyTask.
+
+  protected boolean readListFromInput = false; // governs remote-fetch-input behaviour
+  // If set to true, we'll assume that the input has a _files file present which lists
+  //   the actual input files to copy, and we'll pull each of those on read.
+  // If set to false, it'll behave as a traditional CopyTask.
+
+  public ReplCopyWork() {
+  }
+
+  public ReplCopyWork(final Path fromPath, final Path toPath) {
+    super(fromPath, toPath, true);
+  }
+
+  public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+    super(fromPath, toPath, errorOnSrcEmpty);
+  }
+
+  public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){
+    this.listFilesOnOutput = listFilesOnOutput;
+  }
+
+  public boolean getListFilesOnOutputBehaviour(){
+    return this.listFilesOnOutput;
+  }
+
+  public void setReadListFromInput(boolean readListFromInput){
+    this.readListFromInput = readListFromInput;
+  }
+
+  public boolean getReadListFromInput(){
+    return this.readListFromInput;
+  }
+
+  // specialization of getListFilesOnOutputBehaviour, with a filestatus arg
+  // we can default to the default getListFilesOnOutputBehaviour behaviour,
+  // or, we can do additional pattern matching to decide that certain files
+  // should not be listed, and copied instead, _metadata files, for instance.
+  // Currently, we use this to skip _metadata files, but we might decide that
+  // this is not the right place for it later on.
+  public boolean getListFilesOnOutputBehaviour(FileStatus f) {
+    if (f.getPath().toString().contains("_metadata")){
+      return false; // always copy _metadata files
+    }
+    return this.listFilesOnOutput;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..54b2bd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestReplicationSemanticAnalyzer {
+  static QueryState queryState;
+  static HiveConf conf;
+  static String defaultDB = "default";
+  static String tblName = "testReplSA";
+  static ArrayList<String> cols =  new ArrayList<String>(Arrays.asList("col1", "col2"));
+  ParseDriver pd;
+  SemanticAnalyzer sA;
+
+
+  @BeforeClass
+  public static void initialize() throws HiveException {
+    queryState = new QueryState(new HiveConf(SemanticAnalyzer.class));
+    conf = queryState.getConf();
+    conf.set("hive.security.authorization.manager", "");
+    SessionState.start(conf);
+    Hive hiveDb = Hive.get(conf);
+    hiveDb.createTable(defaultDB + "." + tblName, cols, null, OrcInputFormat.class, OrcOutputFormat.class);
+    Table t = hiveDb.getTable(tblName);
+  }
+
+  @AfterClass
+  public static void teardown() throws HiveException {
+  }
+
+  @Test
+  public void testReplDumpParse() throws Exception {
+    ParseDriver pd = new ParseDriver();
+    String fromEventId = "100";
+    String toEventId = "200";
+    String batchSize = "50";
+    ASTNode root;
+    ASTNode child;
+
+    String query = "repl dump " + defaultDB;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_DUMP");
+    assertEquals(root.getChildCount(), 1);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 2);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 1);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId;
+
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), "TOK_TO");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(2);
+    assertEquals(child.getText(), toEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    query =
+        "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId
+            + " batch " + batchSize;
+
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 5);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), "TOK_TO");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(2);
+    assertEquals(child.getText(), toEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(3);
+    assertEquals(child.getText(), "TOK_BATCH");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(4);
+    assertEquals(child.getText(), batchSize);
+    assertEquals(child.getChildCount(), 0);
+  }
+
+  @Test
+  public void testReplLoadParse() throws Exception {
+    // FileSystem fs = FileSystem.get(conf);
+    ParseDriver pd = new ParseDriver();
+    ASTNode root;
+    ASTNode child;
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    Path dumpRoot = new Path(replRoot, "next");
+    System.out.println(replRoot);
+    System.out.println(dumpRoot);
+    String newDB = "default_bak";
+
+    String query = "repl load  from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_LOAD");
+    assertEquals(root.getChildCount(), 1);
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_LOAD");
+    assertEquals(root.getChildCount(), 2);
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+    assertEquals(child.getChildCount(), 0);
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), newDB);
+    assertEquals(child.getChildCount(), 0);
+  }
+
+  // TODO: add this test after repl dump analyze generates tasks
+  //@Test
+  public void testReplDumpAnalyze() throws Exception {
+
+  }
+
+  //@Test
+  public void testReplLoadAnalyze() throws Exception {
+    ParseDriver pd = new ParseDriver();
+    ASTNode root;
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    FileSystem fs = FileSystem.get(conf);
+    Path dumpRoot = new Path(replRoot, "next");
+    System.out.println(replRoot);
+    System.out.println(dumpRoot);
+    String newDB = "default_bak";
+
+    // First create a dump
+    String query = "repl dump " + defaultDB;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    ReplicationSemanticAnalyzer rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+
+    // Then analyze load
+    query = "repl load  from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+    List<Task<? extends Serializable>> roots = rs.getRootTasks();
+    assertEquals(1, roots.size());
+
+    query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+    roots = rs.getRootTasks();
+    assertEquals(1, roots.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 03515ea..0caa42a 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@exim_department
 #### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile
+FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile,file


[2/2] hive git commit: HIVE-15151: Bootstrap support for replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)

Posted by vg...@apache.org.
HIVE-15151: Bootstrap support for replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/739ac3af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/739ac3af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/739ac3af

Branch: refs/heads/master
Commit: 739ac3af32dd74ed735dbc3f942abaa1c293e1cb
Parents: 532e6a7
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Sun Nov 13 20:13:38 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Sun Nov 13 20:13:38 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 250 +++++++++
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   2 +
 .../hive/ql/parse/BaseSemanticAnalyzer.java     |  16 +-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 145 ++++-
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |  41 +-
 .../org/apache/hadoop/hive/ql/parse/HiveLexer.g |   4 +
 .../apache/hadoop/hive/ql/parse/HiveParser.g    |  38 +-
 .../hadoop/hive/ql/parse/IdentifiersParser.g    |   1 +
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 472 ++++++++--------
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |   2 +-
 .../hive/ql/parse/MetaDataExportListener.java   |   4 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 533 +++++++++++++++++++
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |  39 +-
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |  10 +
 .../hadoop/hive/ql/plan/ReplCopyWork.java       | 119 +++++
 .../parse/TestReplicationSemanticAnalyzer.java  | 259 +++++++++
 .../exim_00_unsupported_schema.q.out            |   2 +-
 18 files changed, 1696 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b80f7b5..3a65c80 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -211,6 +211,7 @@ public class HiveConf extends Configuration {
    */
   public static final HiveConf.ConfVars[] metaVars = {
       HiveConf.ConfVars.METASTOREWAREHOUSE,
+      HiveConf.ConfVars.REPLDIR,
       HiveConf.ConfVars.METASTOREURIS,
       HiveConf.ConfVars.METASTORE_SERVER_PORT,
       HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES,
@@ -432,6 +433,8 @@ public class HiveConf extends Configuration {
         "HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. " +
         "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, " +
         "with ${hive.scratch.dir.permission}."),
+    REPLDIR("hive.repl.rootdir","/user/hive/repl/",
+        "HDFS root dir for all replication dumps."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
@@ -2062,7 +2065,7 @@ public class HiveConf extends Configuration {
         "When true the HDFS location stored in the index file will be ignored at runtime.\n" +
         "If the data got moved or the name of the cluster got changed, the index data should still be usable."),
 
-    HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile",
+    HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile,file",
         "A comma separated list of acceptable URI schemes for import and export."),
     // temporary variable for testing. This is added just to turn off this feature in case of a bug in
     // deployment. It has not been documented in hive-default.xml intentionally, this should be removed

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
new file mode 100644
index 0000000..4c0f817
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.util.StringUtils;
+
+public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
+
+
+  private static final long serialVersionUID = 1L;
+
+  private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class);
+
+  public ReplCopyTask(){
+    super();
+  }
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    LOG.debug("ReplCopyTask.execute()");
+    FileSystem dstFs = null;
+    Path toPath = null;
+    try {
+      Path fromPath = work.getFromPath();
+      toPath = work.getToPath();
+
+      console.printInfo("Copying data from " + fromPath.toString(), " to "
+          + toPath.toString());
+
+      ReplCopyWork rwork = ((ReplCopyWork)work);
+
+      FileSystem srcFs = fromPath.getFileSystem(conf);
+      dstFs = toPath.getFileSystem(conf);
+
+      List<FileStatus> srcFiles = new ArrayList<FileStatus>();
+      FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
+      LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
+      if (! rwork.getReadListFromInput()){
+        if (srcs == null || srcs.length == 0) {
+          if (work.isErrorOnSrcEmpty()) {
+            console.printError("No files matching path: " + fromPath.toString());
+            return 3;
+          } else {
+            return 0;
+          }
+        }
+      } else {
+        LOG.debug("ReplCopyTask making sense of _files");
+        // Our input is probably the result of a _files listing, we should expand out _files.
+        srcFiles = filesInFileListing(srcFs,fromPath);
+        LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size()));
+        if (srcFiles == null){
+          if (work.isErrorOnSrcEmpty()) {
+            console.printError("No _files entry found on source: " + fromPath.toString());
+            return 5;
+          } else {
+            return 0;
+          }
+        }
+      }
+      // Add in all the lone filecopies expected as well - applies to
+      // both _files case stragglers and regular copies
+      srcFiles.addAll(Arrays.asList(srcs));
+      LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size()));
+
+      boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+      if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+        console.printError("Cannot make target directory: " + toPath.toString());
+        return 2;
+      }
+
+      BufferedWriter listBW = null;
+      if (rwork.getListFilesOnOutputBehaviour()){
+        Path listPath = new Path(toPath,"_files");
+        LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
+        if (dstFs.exists(listPath)){
+          console.printError("Cannot make target _files file:" + listPath.toString());
+          return 4;
+        }
+        listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath)));
+        // TODO : verify that not specifying charset here does not bite us
+        // later(for cases where filenames have unicode chars)
+      }
+
+      for (FileStatus oneSrc : srcFiles) {
+        console.printInfo("Copying file: " + oneSrc.getPath().toString());
+        LOG.debug("Copying file: " + oneSrc.getPath().toString());
+        if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
+          FileSystem actualSrcFs = null;
+          if (rwork.getReadListFromInput()){
+            // TODO : filesystemcache prevents this from being a perf nightmare, but we
+            // should still probably follow up to see if we need to do something better here.
+            actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+          } else {
+            actualSrcFs = srcFs;
+          }
+
+          LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
+          if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
+            false, // delete source
+            true, // overwrite destination
+            conf)) {
+          console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+              + "to: '" + toPath.toString() + "'");
+          return 1;
+          }
+        }else{
+          LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
+          console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
+          listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+        }
+      }
+
+      if (listBW != null){
+        listBW.close();
+      }
+
+      return 0;
+
+    } catch (Exception e) {
+      console.printError("Failed with exception " + e.getMessage(), "\n"
+          + StringUtils.stringifyException(e));
+      return (1);
+    }
+  }
+
+
+  private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
+      throws IOException {
+    Path fileListing = new Path(path, "_files");
+    LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
+    if (! fs.exists(fileListing)){
+      LOG.debug("ReplCopyTask : _files does not exist");
+      return null; // Returning null from this fn can serve as an err condition.
+      // On success, but with nothing to return, we can return an empty list.
+    }
+
+    List<FileStatus> ret = new ArrayList<FileStatus>();
+    BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)));
+    // TODO : verify if skipping charset here is okay
+
+    String line = null;
+    while ( (line = br.readLine()) != null){
+      LOG.debug("ReplCopyTask :_filesReadLine:" + line);
+      Path p = new Path(line);
+      FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit
+      ret.add(srcFs.getFileStatus(p));
+      // Note - we need srcFs rather than fs, because it is possible that the _files lists files
+      // which are from a different filesystem than the fs where the _files file itself was loaded
+      // from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/ and for the _files
+      // in it to contain hdfs://<name>/ entries, and/or vice-versa, and this causes errors.
+      // It might also be possible that there will be a mix of them in a given _files file.
+      // TODO: revisit close to the end of replv2 dev, to see if our assumption now still holds,
+      // and if not so, optimize.
+    }
+
+    return ret;
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.COPY;
+    // there's no extensive need for this to have its own type - it mirrors
+    // the intent of copy enough. This might change later, though.
+  }
+
+  @Override
+  public String getName() {
+    return "REPL_COPY";
+  }
+
+  public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+    Task<?> copyTask = null;
+    LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath);
+    if (replicationSpec.isInReplicationScope()){
+      ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+      LOG.debug("ReplCopyTask:\trcwork");
+      if (replicationSpec.isLazy()){
+        LOG.debug("ReplCopyTask:\tlazy");
+        rcwork.setReadListFromInput(true);
+      }
+      copyTask = TaskFactory.get(rcwork, conf);
+    } else {
+      LOG.debug("ReplCopyTask:\tcwork");
+      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+    }
+    return copyTask;
+  }
+
+  public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+    Task<?> copyTask = null;
+    LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath);
+    if (replicationSpec.isInReplicationScope()){
+      ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+      LOG.debug("ReplCopyTask:\trcwork");
+      if (replicationSpec.isLazy()){
+        LOG.debug("ReplCopyTask:\tlazy");
+        rcwork.setListFilesOnOutputBehaviour(true);
+      }
+      copyTask = TaskFactory.get(rcwork, conf);
+    } else {
+      LOG.debug("ReplCopyTask:\tcwork");
+      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+    }
+    return copyTask;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 14fd61a..d61a460 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -77,6 +78,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<MoveWork>(MoveWork.class, MoveTask.class));
     taskvec.add(new TaskTuple<FetchWork>(FetchWork.class, FetchTask.class));
     taskvec.add(new TaskTuple<CopyWork>(CopyWork.class, CopyTask.class));
+    taskvec.add(new TaskTuple<ReplCopyWork>(ReplCopyWork.class, ReplCopyTask.class));
     taskvec.add(new TaskTuple<DDLWork>(DDLWork.class, DDLTask.class));
     taskvec.add(new TaskTuple<FunctionWork>(FunctionWork.class,
         FunctionTask.class));

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index ffb6ae3..7b63c52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -1607,8 +1607,12 @@ public abstract class BaseSemanticAnalyzer {
   }
 
   protected WriteEntity toWriteEntity(Path location) throws SemanticException {
+    return toWriteEntity(location,conf);
+  }
+
+  public static WriteEntity toWriteEntity(Path location, HiveConf conf) throws SemanticException {
     try {
-      Path path = tryQualifyPath(location);
+      Path path = tryQualifyPath(location,conf);
       return new WriteEntity(path, FileUtils.isLocalFile(conf, path.toUri()));
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -1620,8 +1624,12 @@ public abstract class BaseSemanticAnalyzer {
   }
 
   protected ReadEntity toReadEntity(Path location) throws SemanticException {
+    return toReadEntity(location, conf);
+  }
+
+  public static ReadEntity toReadEntity(Path location, HiveConf conf) throws SemanticException {
     try {
-      Path path = tryQualifyPath(location);
+      Path path = tryQualifyPath(location, conf);
       return new ReadEntity(path, FileUtils.isLocalFile(conf, path.toUri()));
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -1629,6 +1637,10 @@ public abstract class BaseSemanticAnalyzer {
   }
 
   private Path tryQualifyPath(Path path) throws IOException {
+    return tryQualifyPath(path,conf);
+  }
+
+  public static Path tryQualifyPath(Path path, HiveConf conf) throws IOException {
     try {
       return path.getFileSystem(conf).makeQualified(path);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 167f7a5..a0d492d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -20,6 +20,14 @@ package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Function;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,10 +53,12 @@ import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -63,8 +73,73 @@ import java.util.TreeMap;
  */
 public class EximUtil {
 
+  public static final String METADATA_NAME="_metadata";
+
   private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
 
+  /**
+   * Wrapper class for common BaseSemanticAnalyzer non-static members
+   * into static generic methods without having the fn signatures
+   * becoming overwhelming, with passing each of these into every function.
+   *
+   * Note, however, that since this is constructed with args passed in,
+   * parts of the context, such as the tasks or inputs, might have been
+   * overridden with temporary context values, rather than being exactly
+   * 1:1 equivalent to BaseSemanticAnalyzer.getRootTasks() or BSA.getInputs().
+   */
+  public static class SemanticAnalyzerWrapperContext {
+    private HiveConf conf;
+    private Hive db;
+    private HashSet<ReadEntity> inputs;
+    private HashSet<WriteEntity> outputs;
+    private List<Task<? extends Serializable>> tasks;
+    private Logger LOG;
+    private Context ctx;
+
+    public HiveConf getConf() {
+      return conf;
+    }
+
+    public Hive getHive() {
+      return db;
+    }
+
+    public HashSet<ReadEntity> getInputs() {
+      return inputs;
+    }
+
+    public HashSet<WriteEntity> getOutputs() {
+      return outputs;
+    }
+
+    public List<Task<? extends Serializable>> getTasks() {
+      return tasks;
+    }
+
+    public Logger getLOG() {
+      return LOG;
+    }
+
+    public Context getCtx() {
+      return ctx;
+    }
+
+    public SemanticAnalyzerWrapperContext(HiveConf conf, Hive db,
+                                          HashSet<ReadEntity> inputs,
+                                          HashSet<WriteEntity> outputs,
+                                          List<Task<? extends Serializable>> tasks,
+                                          Logger LOG, Context ctx){
+      this.conf = conf;
+      this.db = db;
+      this.inputs = inputs;
+      this.outputs = outputs;
+      this.tasks = tasks;
+      this.LOG = LOG;
+      this.ctx = ctx;
+    }
+  }
+
+
   private EximUtil() {
   }
 
@@ -162,10 +237,41 @@ public class EximUtil {
   }
 
   /* major version number should match for backward compatibility */
-  public static final String METADATA_FORMAT_VERSION = "0.1";
+  public static final String METADATA_FORMAT_VERSION = "0.2";
+
   /* If null, then the major version number should match */
   public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null;
 
+  public static void createDbExportDump(
+      FileSystem fs, Path metadataPath, Database dbObj,
+      ReplicationSpec replicationSpec) throws IOException, SemanticException {
+
+    // WARNING NOTE : at this point, createDbExportDump lives only in a world where ReplicationSpec is in replication scope
+    // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using
+    // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
+
+    OutputStream out = fs.create(metadataPath);
+    JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
+    jgen.writeStartObject();
+    jgen.writeStringField("version",METADATA_FORMAT_VERSION);
+    dbObj.putToParameters(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState());
+
+    if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
+      jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+    }
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      jgen.writeStringField("db", serializer.toString(dbObj, "UTF-8"));
+    } catch (TException e) {
+      throw new SemanticException(
+          ErrorMsg.ERROR_SERIALIZE_METASTORE
+              .getMsg(), e);
+    }
+
+    jgen.writeEndObject();
+    jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
+  }
+
   public static void createExportDump(FileSystem fs, Path metadataPath,
       org.apache.hadoop.hive.ql.metadata.Table tableHandle,
       Iterable<org.apache.hadoop.hive.ql.metadata.Partition> partitions,
@@ -255,19 +361,25 @@ public class EximUtil {
    * Utility class to help return complex value from readMetaData function
    */
   public static class ReadMetaData {
+    private final Database db;
     private final Table table;
     private final Iterable<Partition> partitions;
     private final ReplicationSpec replicationSpec;
 
     public ReadMetaData(){
-      this(null,null,new ReplicationSpec());
+      this(null,null,null,new ReplicationSpec());
     }
-    public ReadMetaData(Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
+    public ReadMetaData(Database db, Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
+      this.db = db;
       this.table = table;
       this.partitions = partitions;
       this.replicationSpec = replicationSpec;
     }
 
+    public Database getDatabase(){
+      return db;
+    }
+
     public Table getTable() {
       return table;
     }
@@ -298,12 +410,21 @@ public class EximUtil {
       String version = jsonContainer.getString("version");
       String fcversion = getJSONStringEntry(jsonContainer, "fcversion");
       checkCompatibility(version, fcversion);
+
+      String dbDesc = getJSONStringEntry(jsonContainer, "db");
       String tableDesc = getJSONStringEntry(jsonContainer,"table");
+      TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+
+      Database db = null;
+      if (dbDesc != null){
+        db = new Database();
+        deserializer.deserialize(db, dbDesc, "UTF-8");
+      }
+
       Table table = null;
       List<Partition> partitionsList = null;
       if (tableDesc != null){
         table = new Table();
-        TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
         deserializer.deserialize(table, tableDesc, "UTF-8");
         // TODO : jackson-streaming-iterable-redo this
         JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions"));
@@ -316,7 +437,7 @@ public class EximUtil {
         }
       }
 
-      return new ReadMetaData(table, partitionsList,readReplicationSpec(jsonContainer));
+      return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer));
     } catch (JSONException e) {
       throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
     } catch (TException e) {
@@ -438,4 +559,18 @@ public class EximUtil {
     }
     return true;
   }
+
+  public static PathFilter getDirectoryFilter(final FileSystem fs) {
+    // TODO : isn't there a prior impl of an isDirectory utility PathFilter so users don't have to write their own?
+    return new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        try {
+          return fs.isDirectory(p);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 475f2c9..f61274b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -22,6 +22,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,16 +31,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.slf4j.Logger;
 
 /**
  * ExportSemanticAnalyzer.
@@ -88,6 +93,18 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
+    // All parsing is done, we're now good to start the export process.
+    prepareExport(ast, toURI, ts, replicationSpec, db, conf, ctx, rootTasks, inputs, outputs, LOG);
+
+  }
+
+  // FIXME : Move to EximUtil - it's okay for this to stay here for a little while more till we finalize the statics
+  public static void prepareExport(
+      ASTNode ast, URI toURI, TableSpec ts,
+      ReplicationSpec replicationSpec, Hive db, HiveConf conf,
+      Context ctx, List<Task<? extends Serializable>> rootTasks, HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+      Logger LOG) throws SemanticException {
+
     if (ts != null) {
       try {
         EximUtil.validateTable(ts.tableHandle);
@@ -109,6 +126,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     try {
+
       FileSystem fs = FileSystem.get(toURI, conf);
       Path toPath = new Path(toURI.getScheme(), toURI.getAuthority(), toURI.getPath());
       try {
@@ -156,12 +174,12 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
       EximUtil.createExportDump(
           FileSystem.getLocal(conf),
           path,
-          (ts != null ? ts.tableHandle: null),
+          (ts != null ? ts.tableHandle : null),
           partitions,
           replicationSpec);
 
-      Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
-          path, new Path(toURI), false), conf);
+      Task<? extends Serializable> rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, path, new Path(toURI), conf);
+
       rootTasks.add(rTask);
       LOG.debug("_metadata file written into " + path.toString()
           + " and then copied to " + toURI.toString());
@@ -177,23 +195,22 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
         for (Partition partition : partitions) {
           Path fromPath = partition.getDataLocation();
           Path toPartPath = new Path(parentPath, partition.getName());
-          Task<? extends Serializable> rTask = TaskFactory.get(
-              new CopyWork(fromPath, toPartPath, false),
-              conf);
+          Task<? extends Serializable> rTask =
+              ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
           rootTasks.add(rTask);
           inputs.add(new ReadEntity(partition));
         }
       } else {
         Path fromPath = ts.tableHandle.getDataLocation();
         Path toDataPath = new Path(parentPath, "data");
-        Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
-            fromPath, toDataPath, false), conf);
+        Task<? extends Serializable> rTask =
+            ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
         rootTasks.add(rTask);
         inputs.add(new ReadEntity(ts.tableHandle));
       }
-      outputs.add(toWriteEntity(parentPath));
+      outputs.add(toWriteEntity(parentPath,conf));
     }
-
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index b467c51..4357328 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -338,6 +338,10 @@ KW_EXTRACT: 'EXTRACT';
 KW_FLOOR: 'FLOOR';
 KW_MERGE: 'MERGE';
 KW_MATCHED: 'MATCHED';
+KW_REPL: 'REPL';
+KW_DUMP: 'DUMP';
+KW_BATCH: 'BATCH';
+KW_STATUS: 'STATUS';
 
 // Operators
 // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index bd53a36..55915a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -388,7 +388,11 @@ TOK_MERGE;
 TOK_MATCHED;
 TOK_NOT_MATCHED;
 TOK_UPDATE;
-TOK_DELETE;
+TOK_DELETE;TOK_REPL_DUMP;
+TOK_REPL_LOAD;
+TOK_REPL_STATUS;
+TOK_BATCH;
+TOK_TO;
 }
 
 
@@ -738,6 +742,9 @@ execStatement
     | loadStatement
     | exportStatement
     | importStatement
+    | replDumpStatement
+    | replLoadStatement
+    | replStatusStatement
     | ddlStatement
     | deleteStatement
     | updateStatement
@@ -779,6 +786,35 @@ importStatement
     -> ^(TOK_IMPORT $path $tab? $ext? tableLocation?)
     ;
 
+replDumpStatement
+@init { pushMsg("replication dump statement", state); }
+@after { popMsg(state); }
+      : KW_REPL KW_DUMP
+        (dbName=identifier) (DOT tblName=identifier)?
+        (KW_FROM (eventId=Number)
+          (KW_TO (rangeEnd=Number))?
+          (KW_BATCH (batchSize=Number))?
+        )?
+    -> ^(TOK_REPL_DUMP $dbName $tblName? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_BATCH $batchSize)?)? )
+    ;
+
+replLoadStatement
+@init { pushMsg("replication load statement", state); }
+@after { popMsg(state); }
+      : KW_REPL KW_LOAD
+        ((dbName=identifier) (DOT tblName=identifier)?)?
+        KW_FROM (path=StringLiteral)
+      -> ^(TOK_REPL_LOAD $path $dbName? $tblName?)
+      ;
+
+replStatusStatement
+@init { pushMsg("replication load statement", state); }
+@after { popMsg(state); }
+      : KW_REPL KW_STATUS
+        (dbName=identifier) (DOT tblName=identifier)?
+      -> ^(TOK_REPL_STATUS $dbName $tblName?)
+      ;
+
 ddlStatement
 @init { pushMsg("ddl statement", state); }
 @after { popMsg(state); }

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index aa92739..a82083b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -762,6 +762,7 @@ nonReserved
     | KW_KEY
     | KW_MERGE
     | KW_MATCHED
+    | KW_REPL | KW_DUMP | KW_BATCH | KW_STATUS
 ;
 
 //The following SQL2011 reserved keywords are used as function name only, but not as identifiers.

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 9986fcf..3420efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -48,16 +49,17 @@ import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
-import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -73,12 +75,14 @@ import org.apache.hadoop.mapred.OutputFormat;
  */
 public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
-  public static final String METADATA_NAME="_metadata";
-
   public ImportSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
 
+  // FIXME : Note that the tableExists flag as used by Auth is kinda a hack and
+  // assumes only 1 table will ever be imported - this assumption is broken by
+  // REPL LOAD. We need to fix this. Maybe by continuing the hack and replacing
+  // by a map, maybe by coming up with a better api for it.
   private boolean tableExists = false;
 
   public boolean existsTable() {
@@ -92,14 +96,16 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       boolean isLocationSet = false;
       boolean isExternalSet = false;
-      boolean isTableSet = false;
-      boolean isDbNameSet = false;
       boolean isPartSpecSet = false;
       String parsedLocation = null;
       String parsedTableName = null;
       String parsedDbName = null;
       LinkedHashMap<String, String> parsedPartSpec = new LinkedHashMap<String, String>();
 
+      // waitOnCreateDb determines whether or not non-existence of
+      // db is an error. For regular imports, it is.
+      boolean waitOnCreateDb = false;
+
       for (int i = 1; i < ast.getChildCount(); ++i){
         ASTNode child = (ASTNode) ast.getChild(i);
         switch (child.getToken().getType()){
@@ -111,14 +117,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             parsedLocation = EximUtil.relativeToAbsolutePath(conf, unescapeSQLString(child.getChild(0).getText()));
             break;
           case HiveParser.TOK_TAB:
-            isTableSet = true;
             ASTNode tableNameNode = (ASTNode) child.getChild(0);
             Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableNameNode);
             parsedDbName = dbTablePair.getKey();
             parsedTableName = dbTablePair.getValue();
-            if (parsedDbName != null){
-              isDbNameSet = true;
-            }
             // get partition metadata if partition specified
             if (child.getChildCount() == 2) {
               ASTNode partspec = (ASTNode) child.getChild(1);
@@ -130,111 +132,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
 
       // parsing statement is now done, on to logic.
+      tableExists = prepareImport(
+          isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+          parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
+          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
 
-      // initialize load path
-      URI fromURI = EximUtil.getValidatedURI(conf, stripQuotes(fromTree.getText()));
-      FileSystem fs = FileSystem.get(fromURI, conf);
-      Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
-      inputs.add(toReadEntity(fromPath));
-
-      EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
-      try {
-        rv =  EximUtil.readMetaData(fs, new Path(fromPath, METADATA_NAME));
-      } catch (IOException e) {
-        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
-      }
-
-      ReplicationSpec replicationSpec = rv.getReplicationSpec();
-      if (replicationSpec.isNoop()){
-        // nothing to do here, silently return.
-        return;
-      }
-
-      String dbname = SessionState.get().getCurrentDatabase();
-      if (isDbNameSet){
-        // If the parsed statement contained a db.tablename specification, prefer that.
-        dbname = parsedDbName;
-      }
-
-      // Create table associated with the import
-      // Executed if relevant, and used to contain all the other details about the table if not.
-      CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
-
-      if (isExternalSet){
-        tblDesc.setExternal(isExternalSet);
-        // This condition-check could have been avoided, but to honour the old
-        // default of not calling if it wasn't set, we retain that behaviour.
-        // TODO:cleanup after verification that the outer if isn't really needed here
-      }
-
-      if (isLocationSet){
-        tblDesc.setLocation(parsedLocation);
-        inputs.add(toReadEntity(parsedLocation));
-      }
-
-      if (isTableSet){
-        tblDesc.setTableName(parsedTableName);
-      }
-
-      List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
-      Iterable<Partition> partitions = rv.getPartitions();
-      for (Partition partition : partitions) {
-        // TODO: this should ideally not create AddPartitionDesc per partition
-        AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
-        partitionDescs.add(partsDesc);
-      }
-
-      if (isPartSpecSet){
-        // The import specification asked for only a particular partition to be loaded
-        // We load only that, and ignore all the others.
-        boolean found = false;
-        for (Iterator<AddPartitionDesc> partnIter = partitionDescs
-            .listIterator(); partnIter.hasNext();) {
-          AddPartitionDesc addPartitionDesc = partnIter.next();
-          if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
-            found = true;
-          } else {
-            partnIter.remove();
-          }
-        }
-        if (!found) {
-          throw new SemanticException(
-              ErrorMsg.INVALID_PARTITION
-                  .getMsg(" - Specified partition not found in import directory"));
-        }
-      }
-
-      if (tblDesc.getTableName() == null) {
-        // Either we got the tablename from the IMPORT statement (first priority)
-        // or from the export dump.
-        throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
-      } else {
-        conf.set("import.destination.table", tblDesc.getTableName());
-        for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-          addPartitionDesc.setTableName(tblDesc.getTableName());
-        }
-      }
-
-      Warehouse wh = new Warehouse(conf);
-      Table table = tableIfExists(tblDesc);
-
-      if (table != null){
-        checkTable(table, tblDesc,replicationSpec);
-        LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked");
-        tableExists = true;
-      }
-
-      if (!replicationSpec.isInReplicationScope()){
-        createRegularImportTasks(
-            rootTasks, tblDesc, partitionDescs,
-            isPartSpecSet, replicationSpec, table,
-            fromURI, fs, wh);
-      } else {
-        createReplImportTasks(
-            rootTasks, tblDesc, partitionDescs,
-            isPartSpecSet, replicationSpec, table,
-            fromURI, fs, wh);
-      }
     } catch (SemanticException e) {
       throw e;
     } catch (Exception e) {
@@ -265,7 +167,123 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private AddPartitionDesc getBaseAddPartitionDescFromPartition(
+  public static boolean prepareImport(
+      boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+      String parsedLocation, String parsedTableName, String parsedDbName,
+      LinkedHashMap<String, String> parsedPartSpec,
+      String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
+  ) throws IOException, MetaException, HiveException, URISyntaxException {
+
+    // initialize load path
+    URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn));
+    Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+
+    FileSystem fs = FileSystem.get(fromURI, x.getConf());
+    x.getInputs().add(toReadEntity(fromPath, x.getConf()));
+
+    EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+    try {
+      rv =  EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+    } catch (IOException e) {
+      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+    }
+
+    ReplicationSpec replicationSpec = rv.getReplicationSpec();
+    if (replicationSpec.isNoop()){
+      // nothing to do here, silently return.
+      return false;
+    }
+
+    String dbname = SessionState.get().getCurrentDatabase();
+    if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){
+      // If the parsed statement contained a db.tablename specification, prefer that.
+      dbname = parsedDbName;
+    }
+
+    // Create table associated with the import
+    // Executed if relevant, and used to contain all the other details about the table if not.
+    CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
+
+    if (isExternalSet){
+      tblDesc.setExternal(isExternalSet);
+      // This condition-check could have been avoided, but to honour the old
+      // default of not calling if it wasn't set, we retain that behaviour.
+      // TODO:cleanup after verification that the outer if isn't really needed here
+    }
+
+    if (isLocationSet){
+      tblDesc.setLocation(parsedLocation);
+      x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
+    }
+
+    if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
+      tblDesc.setTableName(parsedTableName);
+    }
+
+    List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+    Iterable<Partition> partitions = rv.getPartitions();
+    for (Partition partition : partitions) {
+      // TODO: this should ideally not create AddPartitionDesc per partition
+      AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+      partitionDescs.add(partsDesc);
+    }
+
+    if (isPartSpecSet){
+      // The import specification asked for only a particular partition to be loaded
+      // We load only that, and ignore all the others.
+      boolean found = false;
+      for (Iterator<AddPartitionDesc> partnIter = partitionDescs
+          .listIterator(); partnIter.hasNext();) {
+        AddPartitionDesc addPartitionDesc = partnIter.next();
+        if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
+          found = true;
+        } else {
+          partnIter.remove();
+        }
+      }
+      if (!found) {
+        throw new SemanticException(
+            ErrorMsg.INVALID_PARTITION
+                .getMsg(" - Specified partition not found in import directory"));
+      }
+    }
+
+    if (tblDesc.getTableName() == null) {
+      // Either we got the tablename from the IMPORT statement (first priority)
+      // or from the export dump.
+      throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
+    } else {
+      x.getConf().set("import.destination.table", tblDesc.getTableName());
+      for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+        addPartitionDesc.setTableName(tblDesc.getTableName());
+      }
+    }
+
+    Warehouse wh = new Warehouse(x.getConf());
+    Table table = tableIfExists(tblDesc, x.getHive());
+    boolean tableExists = false;
+
+    if (table != null){
+      checkTable(table, tblDesc,replicationSpec, x.getConf());
+      x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked");
+      tableExists = true;
+    }
+
+    if (!replicationSpec.isInReplicationScope()){
+      createRegularImportTasks(
+          tblDesc, partitionDescs,
+          isPartSpecSet, replicationSpec, table,
+          fromURI, fs, wh, x);
+    } else {
+      createReplImportTasks(
+          tblDesc, partitionDescs,
+          isPartSpecSet, replicationSpec, waitOnCreateDb, table,
+          fromURI, fs, wh, x);
+    }
+    return tableExists;
+  }
+
+  private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
       Path fromPath, String dbname, CreateTableDesc tblDesc, Partition partition) throws MetaException {
     AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
         EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
@@ -284,7 +302,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return partsDesc;
   }
 
-  private CreateTableDesc getBaseCreateTableDescFromTable(String dbName,
+  private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName,
       org.apache.hadoop.hive.metastore.api.Table table) {
     if ((table.getPartitionKeys() == null) || (table.getPartitionKeys().size() == 0)){
       table.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
@@ -318,94 +336,95 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return tblDesc;
   }
 
-  private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) {
+  private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
+                            ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
     Path dataPath = new Path(fromURI.toString(), "data");
-    Path tmpPath = ctx.getExternalTmpPath(tgtPath);
-    Task<?> copyTask = TaskFactory.get(new CopyWork(dataPath,
-       tmpPath, false), conf);
+    Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
+    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
     LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
         Utilities.getTableDesc(table), new TreeMap<String, String>(),
         replace);
-    Task<?> loadTableTask = TaskFactory.get(new MoveWork(getInputs(),
-        getOutputs(), loadTableWork, null, false), conf);
+    Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
+        x.getOutputs(), loadTableWork, null, false), x.getConf());
     copyTask.addDependentTask(loadTableTask);
-    rootTasks.add(copyTask);
+    x.getTasks().add(copyTask);
     return loadTableTask;
   }
 
-  private Task<?> createTableTask(CreateTableDesc tableDesc){
+  private static Task<?> createTableTask(CreateTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
     return TaskFactory.get(new DDLWork(
-        getInputs(),
-        getOutputs(),
+        x.getInputs(),
+        x.getOutputs(),
         tableDesc
-    ), conf);
+    ), x.getConf());
   }
 
-  private Task<?> dropTableTask(Table table){
+  private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
     return TaskFactory.get(new DDLWork(
-        getInputs(),
-        getOutputs(),
+        x.getInputs(),
+        x.getOutputs(),
         new DropTableDesc(table.getTableName(), null, true, true, null)
-    ), conf);
+    ), x.getConf());
   }
 
-  private Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc) {
+  private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
+      EximUtil.SemanticAnalyzerWrapperContext x) {
     tableDesc.setReplaceMode(true);
     return TaskFactory.get(new DDLWork(
-        getInputs(),
-        getOutputs(),
+        x.getInputs(),
+        x.getOutputs(),
         tableDesc
-    ), conf);
+    ), x.getConf());
   }
 
-  private Task<? extends Serializable> alterSinglePartition(
+  private static Task<? extends Serializable> alterSinglePartition(
       URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
-      ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) {
+      ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
+      EximUtil.SemanticAnalyzerWrapperContext x) {
     addPartitionDesc.setReplaceMode(true);
     addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
     return TaskFactory.get(new DDLWork(
-        getInputs(),
-        getOutputs(),
+        x.getInputs(),
+        x.getOutputs(),
         addPartitionDesc
-    ), conf);
+    ), x.getConf());
   }
 
-
- private Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
+ private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
       Table table, Warehouse wh,
-      AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec)
+      AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
     if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
-      LOG.debug("Importing in-place: adding AddPart for partition "
+      x.getLOG().debug("Importing in-place: adding AddPart for partition "
           + partSpecToString(partSpec.getPartSpec()));
       // addPartitionDesc already has the right partition location
-      Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
-          getOutputs(), addPartitionDesc), conf);
+      Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+          x.getOutputs(), addPartitionDesc), x.getConf());
       return addPartTask;
     } else {
       String srcLocation = partSpec.getLocation();
-      fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec);
-      LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+      fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+      x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
           + partSpecToString(partSpec.getPartSpec())
           + " with source location: " + srcLocation);
       Path tgtLocation = new Path(partSpec.getLocation());
-      Path tmpPath = ctx.getExternalTmpPath(tgtLocation);
-      Task<?> copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation),
-          tmpPath, false), conf);
-      Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
-          getOutputs(), addPartitionDesc), conf);
+      Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+          replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
+      Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+          x.getOutputs(), addPartitionDesc), x.getConf());
       LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
           Utilities.getTableDesc(table),
           partSpec.getPartSpec(), true);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
-          getInputs(), getOutputs(), loadTableWork, null, false),
-          conf);
+          x.getInputs(), x.getOutputs(), loadTableWork, null, false),
+          x.getConf());
       copyTask.addDependentTask(loadPartTask);
       addPartTask.addDependentTask(loadPartTask);
-      rootTasks.add(copyTask);
+      x.getTasks().add(copyTask);
       return addPartTask;
     }
   }
@@ -413,17 +432,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   /**
    * Helper method to set location properly in partSpec
    */
-  private void fixLocationInPartSpec(
+  private static void fixLocationInPartSpec(
       FileSystem fs, CreateTableDesc tblDesc, Table table,
       Warehouse wh, ReplicationSpec replicationSpec,
-      AddPartitionDesc.OnePartitionDesc partSpec) throws MetaException, HiveException, IOException {
+      AddPartitionDesc.OnePartitionDesc partSpec,
+      EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException {
     Path tgtPath = null;
     if (tblDesc.getLocation() == null) {
       if (table.getDataLocation() != null) {
         tgtPath = new Path(table.getDataLocation().toString(),
             Warehouse.makePartPath(partSpec.getPartSpec()));
       } else {
-        Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+        Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
         tgtPath = new Path(
             wh.getTablePath( parentDb, tblDesc.getTableName()),
             Warehouse.makePartPath(partSpec.getPartSpec()));
@@ -432,22 +452,23 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       tgtPath = new Path(tblDesc.getLocation(),
           Warehouse.makePartPath(partSpec.getPartSpec()));
     }
-    FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
-    checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
+    FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+    checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
     partSpec.setLocation(tgtPath.toString());
   }
 
-  private void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec)
+  private static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec,
+                                        EximUtil.SemanticAnalyzerWrapperContext x)
       throws IOException, SemanticException {
     if (replicationSpec.isInReplicationScope()){
       // replication scope allows replacement, and does not require empty directories
       return;
     }
-    LOG.debug("checking emptiness of " + targetPath.toString());
+    x.getLOG().debug("checking emptiness of " + targetPath.toString());
     if (fs.exists(targetPath)) {
       FileStatus[] status = fs.listStatus(targetPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
       if (status.length > 0) {
-        LOG.debug("Files inc. " + status[0].getPath().toString()
+        x.getLOG().debug("Files inc. " + status[0].getPath().toString()
             + " found in path : " + targetPath.toString());
         throw new SemanticException(ErrorMsg.TABLE_DATA_EXISTS.getMsg());
       }
@@ -469,7 +490,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return sb.toString();
   }
 
-  private void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec)
+  private static void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf)
       throws SemanticException, URISyntaxException {
     // This method gets called only in the scope that a destination table already exists, so
     // we're validating if the table is an appropriate destination to import into
@@ -681,25 +702,33 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   /**
    * Create tasks for regular import, no repl complexity
+   * @param tblDesc
+   * @param partitionDescs
+   * @param isPartSpecSet
+   * @param replicationSpec
+   * @param table
+   * @param fromURI
+   * @param fs
+   * @param wh
    */
-  private void createRegularImportTasks(
-      List<Task<? extends Serializable>> rootTasks,
+  private static void createRegularImportTasks(
       CreateTableDesc tblDesc,
       List<AddPartitionDesc> partitionDescs,
       boolean isPartSpecSet,
       ReplicationSpec replicationSpec,
-      Table table, URI fromURI, FileSystem fs, Warehouse wh)
+      Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     if (table != null){
       if (table.isPartitioned()) {
-        LOG.debug("table partitioned");
+        x.getLOG().debug("table partitioned");
 
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-          if ((ptn = db.getPartition(table, partSpec, false)) == null) {
-            rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+          if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+            x.getTasks().add(addSinglePartition(
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -707,35 +736,35 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
 
       } else {
-        LOG.debug("table non-partitioned");
+        x.getLOG().debug("table non-partitioned");
         // ensure if destination is not empty only for regular import
         Path tgtPath = new Path(table.getDataLocation().toString());
-        FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
-        checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
-        loadTable(fromURI, table, false, tgtPath);
+        FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+        checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
       }
       // Set this to read because we can't overwrite any existing partitions
-      outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
+      x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
     } else {
-      LOG.debug("table " + tblDesc.getTableName() + " does not exist");
+      x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist");
 
-      Task<?> t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf);
+      Task<?> t = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), tblDesc), x.getConf());
       table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
-      Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+      Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
 
       // Since we are going to be creating a new table in a db, we should mark that db as a write entity
       // so that the auth framework can go to work there.
-      outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
+      x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
 
       if (isPartitioned(tblDesc)) {
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(
-              addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+              addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
         }
       } else {
-        LOG.debug("adding dependent CopyWork/MoveWork for table");
+        x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
         if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
-          LOG.debug("Importing in place, no emptiness check, no copying/loading");
+          x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
           Path dataPath = new Path(fromURI.toString(), "data");
           tblDesc.setLocation(dataPath.toString());
         } else {
@@ -745,23 +774,24 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           } else {
             tablePath = wh.getTablePath(parentDb, tblDesc.getTableName());
           }
-          FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf);
-          checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec);
-          t.addDependentTask(loadTable(fromURI, table, false, tablePath));
+          FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
+          checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
+          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x));
         }
       }
-      rootTasks.add(t);
+      x.getTasks().add(t);
     }
   }
 
   /**
    * Create tasks for repl import
    */
-  private void createReplImportTasks(
-      List<Task<? extends Serializable>> rootTasks,
+  private static void createReplImportTasks(
       CreateTableDesc tblDesc,
       List<AddPartitionDesc> partitionDescs,
-      boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh)
+      boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+      Table table, URI fromURI, FileSystem fs, Warehouse wh,
+      EximUtil.SemanticAnalyzerWrapperContext x)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     Task dr = null;
@@ -774,7 +804,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       // So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we
       // drop and re-create.
       if (replicationSpec.allowReplacementInto(table)){
-        dr = dropTableTask(table);
+        dr = dropTableTask(table, x);
         lockType = WriteEntity.WriteType.DDL_EXCLUSIVE;
         table = null; // null it out so we go into the table re-create flow.
       } else {
@@ -782,12 +812,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
-    Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+    // Normally, on import, trying to create a table or a partition in a db that does not yet exist
+    // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
+    // to create tasks to create a table inside a db that as-of-now does not exist, but there is
+    // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate
+    // defaults and do not error out in that case.
+    Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
     if (parentDb == null){
-      throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
+      if (!waitOnCreateDb){
+        throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
+      }
     }
     if (tblDesc.getLocation() == null) {
-      tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
+      if (!waitOnCreateDb){
+        tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
+      } else {
+        tblDesc.setLocation(
+            wh.getDnsPath(new Path(
+                wh.getDefaultDatabasePath(tblDesc.getDatabaseName()),
+                MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase())
+            )
+        ).toString());
+
+      }
     }
 
      /* Note: In the following section, Metadata-only import handling logic is
@@ -807,49 +854,52 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         lockType = WriteEntity.WriteType.DDL_SHARED;
       }
 
-      Task t = createTableTask(tblDesc);
+      Task t = createTableTask(tblDesc, x);
       table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
 
       if (!replicationSpec.isMetadataOnly()) {
         if (isPartitioned(tblDesc)) {
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
           }
         } else {
-          LOG.debug("adding dependent CopyWork/MoveWork for table");
-          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation())));
+          x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
+          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
         }
       }
       if (dr == null){
         // Simply create
-        rootTasks.add(t);
+        x.getTasks().add(t);
       } else {
         // Drop and recreate
         dr.addDependentTask(t);
-        rootTasks.add(dr);
+        x.getTasks().add(dr);
       }
     } else {
       // Table existed, and is okay to replicate into, not dropping and re-creating.
       if (table.isPartitioned()) {
-        LOG.debug("table partitioned");
+        x.getLOG().debug("table partitioned");
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
 
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
 
-          if ((ptn = db.getPartition(table, partSpec, false)) == null) {
+          if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             if (!replicationSpec.isMetadataOnly()){
-              rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+              x.getTasks().add(addSinglePartition(
+                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
             }
           } else {
             // If replicating, then the partition already existing means we need to replace, maybe, if
             // the destination ptn's repl.last.id is older than the replacement's.
             if (replicationSpec.allowReplacementInto(ptn)){
               if (!replicationSpec.isMetadataOnly()){
-                rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+                x.getTasks().add(addSinglePartition(
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
               } else {
-                rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn));
+                x.getTasks().add(alterSinglePartition(
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
               }
               if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
                 lockType = WriteEntity.WriteType.DDL_SHARED;
@@ -862,31 +912,31 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
           // MD-ONLY table alter
-          rootTasks.add(alterTableTask(tblDesc));
+          x.getTasks().add(alterTableTask(tblDesc, x));
           if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
             lockType = WriteEntity.WriteType.DDL_SHARED;
           }
         }
       } else {
-        LOG.debug("table non-partitioned");
+        x.getLOG().debug("table non-partitioned");
         if (!replicationSpec.allowReplacementInto(table)){
           return; // silently return, table is newer than our replacement.
         }
         if (!replicationSpec.isMetadataOnly()) {
-          loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into
+          loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
         } else {
-          rootTasks.add(alterTableTask(tblDesc));
+          x.getTasks().add(alterTableTask(tblDesc, x));
         }
         if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
           lockType = WriteEntity.WriteType.DDL_SHARED;
         }
       }
     }
-    outputs.add(new WriteEntity(table,lockType));
+    x.getOutputs().add(new WriteEntity(table,lockType));
 
   }
 
-  private boolean isPartitioned(CreateTableDesc tblDesc) {
+  private static boolean isPartitioned(CreateTableDesc tblDesc) {
     return !(tblDesc.getPartCols() == null || tblDesc.getPartCols().isEmpty());
   }
 
@@ -894,7 +944,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
    * Utility method that returns a table if one corresponding to the destination
    * tblDesc is found. Returns null if no such table is found.
    */
-   private Table tableIfExists(CreateTableDesc tblDesc) throws HiveException {
+   private static Table tableIfExists(CreateTableDesc tblDesc, Hive db) throws HiveException {
     try {
       return db.getTable(tblDesc.getDatabaseName(),tblDesc.getTableName());
     } catch (InvalidTableException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index a7005f1..6726d44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -69,7 +69,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       @Override
       public boolean accept(Path p) {
         String name = p.getName();
-        return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith(".");
+        return name.equals(EximUtil.METADATA_NAME) ? true : !name.startsWith("_") && !name.startsWith(".");
       }
     });
     if ((srcs != null) && srcs.length == 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
index a17696a..db624c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreEventContext;
 import org.apache.hadoop.hive.metastore.events.PreEventContext.PreEventType;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+
 /**
  * This class listens for drop events and, if set, exports the table's metadata as JSON to the trash
  * of the user performing the drop
@@ -83,7 +83,7 @@ public class MetaDataExportListener extends MetaStorePreEventListener {
     } catch (IOException e) {
       throw new MetaException(e.getMessage());
     }
-    Path outFile = new Path(metaPath, name + ImportSemanticAnalyzer.METADATA_NAME);
+    Path outFile = new Path(metaPath, name + EximUtil.METADATA_NAME);
     try {
       SessionState.getConsole().printInfo("Beginning metadata export");
       EximUtil.createExportDump(fs, outFile, mTbl, null, null);