You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 20:21:03 UTC

[31/50] [abbrv] hive git commit: HIVE-15151: Bootstrap support for replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..a4dfa3a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+  // Database name or pattern
+  private String dbNameOrPattern;
+  // Table name or pattern
+  private String tblNameOrPattern;
+  private Integer eventFrom;
+  private Integer eventTo;
+  private Integer batchSize;
+  // Base path for REPL LOAD
+  private String path;
+
+  public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+    super(queryState);
+  }
+
+  @Override
+  public void analyzeInternal(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal");
+    LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + ast.getText());
+    switch (ast.getToken().getType()) {
+    case TOK_REPL_DUMP: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
+      initReplDump(ast);
+      analyzeReplDump(ast);
+    }
+    case TOK_REPL_LOAD: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
+      initReplLoad(ast);
+      analyzeReplLoad(ast);
+    }
+    case TOK_REPL_STATUS: {
+      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
+      initReplStatus(ast);
+      analyzeReplStatus(ast);
+    }
+    default: {
+      throw new SemanticException("Unexpected root token");
+    }
+    }
+  }
+
+  private void initReplDump(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    // skip the first node, which is always required
+    int currNode = 1;
+    while (currNode < numChildren) {
+      if (ast.getChild(currNode).getType() != TOK_FROM) {
+        // optional tblName was specified.
+        tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText());
+      } else {
+        // TOK_FROM subtree
+        Tree fromNode = ast.getChild(currNode);
+        eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+        // skip the first, which is always required
+        int numChild = 1;
+        while (numChild < fromNode.getChildCount()) {
+          if (fromNode.getChild(numChild).getType() == TOK_TO) {
+            eventTo =
+                Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+            // skip the next child, since we already took care of it
+            numChild++;
+          } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
+            batchSize =
+                Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+            // skip the next child, since we already took care of it
+            numChild++;
+          }
+          // move to the next child in FROM tree
+          numChild++;
+        }
+        // FROM node is always the last
+        break;
+      }
+      // move to the next root node
+      currNode++;
+    }
+  }
+
+  // REPL DUMP
+  private void analyzeReplDump(ASTNode ast) throws SemanticException {
+    // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+        + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+        + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    Path dumpRoot = new Path(replRoot, getNextDumpDir());
+    try {
+      for (String dbName : matchesDb(dbNameOrPattern)) {
+        LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+        Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+        for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+          LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+              + " to db root " + dbRoot.toUri());
+          dumpTbl(ast, dbName, tblName, dbRoot);
+        }
+      }
+      String currentReplId =
+          String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
+          "dump_dir,last_repl_id#string,string");
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+  }
+
+  String getNextDumpDir() {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      return "next";
+      // make it easy to write unit tests, instead of unique id generation.
+      // however, this does mean that in writing tests, we have to be aware that
+      // repl dump will clash with prior dumps, and thus have to clean up properly.
+    } else {
+      return String.valueOf(System.currentTimeMillis());
+      // TODO: time good enough for now - we'll likely improve this.
+      // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+      // uniqueness.
+    }
+  }
+
+  /**
+   *
+   * @param dbName
+   * @param dumpRoot
+   * @return db dumped path
+   * @throws SemanticException
+   */
+  private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
+    Path dbRoot = new Path(dumpRoot, dbName);
+    try {
+      // TODO : instantiating FS objects are generally costly. Refactor
+      FileSystem fs = dbRoot.getFileSystem(conf);
+      Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+      Database dbObj = db.getDatabase(dbName);
+      EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+    return dbRoot;
+  }
+
+  /**
+   *
+   * @param ast
+   * @param dbName
+   * @param tblName
+   * @param dbRoot
+   * @return tbl dumped path
+   * @throws SemanticException
+   */
+  private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException {
+    Path tableRoot = new Path(dbRoot, tblName);
+    try {
+      URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
+      TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+      ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
+          rootTasks, inputs, outputs, LOG);
+    } catch (HiveException e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+    return tableRoot;
+  }
+
+  // REPL LOAD
+  private void initReplLoad(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    if (numChildren > 1) {
+      dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+    }
+    if (numChildren > 2) {
+      tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText());
+    }
+  }
+
+  /*
+   * Example dump dirs we need to be able to handle :
+   *
+   * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+   *
+   * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
+   * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+   *
+   * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+   *
+   * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+   */
+  private void analyzeReplLoad(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
+        + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path));
+
+    // for analyze repl load, we walk through the dir structure available in the path,
+    // looking at each db, and then each table, and then setting up the appropriate
+    // import job in its place.
+
+    // FIXME : handle non-bootstrap cases.
+
+    // We look at the path, and go through each subdir.
+    // Each subdir corresponds to a database.
+    // For each subdir, there is a _metadata file which allows us to re-impress the db object
+    // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
+    // that we had an IMPORT on each of them, into this db.
+
+    try {
+
+      Path loadPath = new Path(path);
+      final FileSystem fs = loadPath.getFileSystem(conf);
+
+      if (!fs.exists(loadPath)) {
+        // supposed dump path does not exist.
+        throw new FileNotFoundException(loadPath.toUri().toString());
+      }
+
+      // Now, the dumped path can be one of two things:
+      // a) It can be a db dump, in which case we expect a set of dirs, each with a
+      // db name, and with a _metadata file in each, and table dirs inside that.
+      // b) It can be a table dump dir, in which case we expect a _metadata dump of
+      // a table in question in the dir, and individual ptn dir hierarchy.
+      // Once we expand this into doing incremental repl, we can have individual events which can
+      // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
+      // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
+      // if dbname is specified, we expect exactly one db dumped, and having more is an error
+      // condition.
+
+      if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+        analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+        return;
+      }
+
+      FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
+      if (srcs == null || (srcs.length == 0)) {
+        throw new FileNotFoundException(loadPath.toUri().toString());
+      }
+
+      FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
+
+      if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
+        throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
+      }
+
+      if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+        LOG.debug("Found multiple dirs when we expected 1:");
+        for (FileStatus d : dirsInLoadPath) {
+          LOG.debug("> " + d.getPath().toUri().toString());
+        }
+        throw new IllegalArgumentException(
+            "Multiple dirs in "
+                + loadPath.toUri().toString()
+                + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+      }
+
+      for (FileStatus dir : dirsInLoadPath) {
+        analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+      }
+
+    } catch (Exception e) {
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+      throw new SemanticException(e);
+    }
+
+  }
+
+  private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
+      throws SemanticException {
+    try {
+      // Path being passed to us is a db dump location. We go ahead and load as needed.
+      // dbName might be null or empty, in which case we keep the original db name for the new
+      // database creation
+
+      // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc
+      // associated with that
+      // Then, we iterate over all subdirs, and create table imports for each.
+
+      EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+      try {
+        rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
+      } catch (IOException e) {
+        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+      }
+
+      Database dbObj = rv.getDatabase();
+
+      if (dbObj == null) {
+        throw new IllegalArgumentException(
+            "_metadata file read did not contain a db object - invalid dump.");
+      }
+
+      if ((dbName == null) || (dbName.isEmpty())) {
+        // We use dbName specified as long as it is not null/empty. If so, then we use the original
+        // name
+        // recorded in the thrift object.
+        dbName = dbObj.getName();
+      }
+
+      CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+      createDbDesc.setName(dbName);
+      createDbDesc.setComment(dbObj.getDescription());
+      createDbDesc.setDatabaseProperties(dbObj.getParameters());
+      // note that we do not set location - for repl load, we want that auto-created.
+
+      createDbDesc.setIfNotExists(false);
+      // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+      // db.
+      // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+      Task<? extends Serializable> createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf);
+      rootTasks.add(createDbTask);
+
+      FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
+
+      for (FileStatus tableDir : dirsInDbPath) {
+        analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+      }
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private void analyzeTableLoad(String dbName, String tblName, String locn,
+      Task<? extends Serializable> precursor) throws SemanticException {
+    // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+    // If tblName is null, then we default to the table name specified in _metadata, which is good.
+    // or are both specified, in which case, that's what we are intended to create the new table as.
+    if (dbName == null || dbName.isEmpty()) {
+      throw new SemanticException("Database name cannot be null for a table load");
+    }
+    try {
+      // no location set on repl loads
+      boolean isLocationSet = false;
+      // all repl imports are non-external
+      boolean isExternalSet = false;
+      // bootstrap loads are not partition level
+      boolean isPartSpecSet = false;
+      // repl loads are not partition level
+      LinkedHashMap<String, String> parsedPartSpec = null;
+      // no location for repl imports
+      String parsedLocation = null;
+      boolean waitOnCreateDb = false;
+      List<Task<? extends Serializable>> importTasks = null;
+      if (precursor == null) {
+        importTasks = rootTasks;
+        waitOnCreateDb = false;
+      } else {
+        importTasks = new ArrayList<Task<? extends Serializable>>();
+        waitOnCreateDb = true;
+      }
+      EximUtil.SemanticAnalyzerWrapperContext x =
+          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
+              ctx);
+      ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+          waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+
+      if (precursor != null) {
+        for (Task<? extends Serializable> t : importTasks) {
+          precursor.addDependentTask(t);
+        }
+      }
+
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  // REPL STATUS
+  private void initReplStatus(ASTNode ast) {
+    int numChildren = ast.getChildCount();
+    dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+    if (numChildren > 1) {
+      tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+    }
+  }
+
+  private void analyzeReplStatus(ASTNode ast) throws SemanticException {
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern)
+        + "." + String.valueOf(tblNameOrPattern));
+
+    String replLastId = null;
+
+    try {
+      if (tblNameOrPattern != null) {
+        // Checking for status of table
+        Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern);
+        if (tbl != null) {
+          inputs.add(new ReadEntity(tbl));
+          Map<String, String> params = tbl.getParameters();
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          }
+        }
+      } else {
+        // Checking for status of a db
+        Database database = db.getDatabase(dbNameOrPattern);
+        if (database != null) {
+          inputs.add(new ReadEntity(database));
+          Map<String, String> params = database.getParameters();
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          }
+        }
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    }
+
+    LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
+        + ctx.getResFile());
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+  }
+
+  private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+    LOG.debug("prepareReturnValues : " + schema);
+    for (String s : values) {
+      LOG.debug("    > " + s);
+    }
+
+    ctx.setResFile(ctx.getLocalTmpPath());
+    // FIXME : this should not accessible by the user if we write to it from the frontend.
+    // Thus, we should Desc/Work this, otherwise there is a security issue here.
+    // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
+    // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
+    // this is a partitioned dir, which does not work. Thus, this does not work.
+
+    writeOutput(values);
+  }
+
+  private void writeOutput(List<String> values) throws SemanticException {
+    Path outputFile = ctx.getResFile();
+    FileSystem fs = null;
+    DataOutputStream outStream = null;
+    try {
+      fs = outputFile.getFileSystem(conf);
+      outStream = fs.create(outputFile);
+      outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+      for (int i = 1; i < values.size(); i++) {
+        outStream.write(Utilities.ctrlaCode);
+        outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+      }
+      outStream.write(Utilities.newLineCode);
+    } catch (IOException e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    } finally {
+      IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask -
+                                      // replace with this
+    }
+  }
+
+  private ReplicationSpec getNewReplicationSpec() throws SemanticException {
+    try {
+      ReplicationSpec replicationSpec =
+          new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
+      replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+          .getCurrentNotificationEventId().getEventId()));
+      return replicationSpec;
+    } catch (Exception e) {
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+                                      // codes
+    }
+  }
+
+  private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+      throws HiveException {
+    if (tblPattern == null) {
+      return db.getAllTables(dbName);
+    } else {
+      return db.getTablesByPattern(dbName, tblPattern);
+    }
+  }
+
+  private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
+    if (dbPattern == null) {
+      return db.getAllDatabases();
+    } else {
+      return db.getDatabasesByPattern(dbPattern);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4668271..824cf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -42,6 +42,7 @@ public class ReplicationSpec {
   private String eventId = null;
   private String currStateId = null;
   private boolean isNoop = false;
+  private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
 
 
   // Key definitions related to replication
@@ -49,8 +50,9 @@ public class ReplicationSpec {
     REPL_SCOPE("repl.scope"),
     EVENT_ID("repl.event.id"),
     CURR_STATE_ID("repl.last.id"),
-    NOOP("repl.noop");
-
+    NOOP("repl.noop"),
+    LAZY("repl.lazy"),
+    ;
     private final String keyName;
 
     KEY(String s) {
@@ -102,32 +104,32 @@ public class ReplicationSpec {
     this((ASTNode)null);
   }
 
-  public  ReplicationSpec(
-      boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState,
-      String currentReplicationState, boolean isNoop){
+  public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+      String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) {
     this.isInReplicationScope = isInReplicationScope;
     this.isMetadataOnly = isMetadataOnly;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
     this.isNoop = isNoop;
+    this.isLazy = isLazy;
   }
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
     String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
     this.isMetadataOnly = false;
     this.isInReplicationScope = false;
-    if (scope != null){
-      if (scope.equalsIgnoreCase("metadata")){
+    if (scope != null) {
+      if (scope.equalsIgnoreCase("metadata")) {
         this.isMetadataOnly = true;
         this.isInReplicationScope = true;
-      } else if (scope.equalsIgnoreCase("all")){
+      } else if (scope.equalsIgnoreCase("all")) {
         this.isInReplicationScope = true;
       }
     }
     this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
     this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-    this.isNoop = Boolean.parseBoolean(
-        keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+    this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+    this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
   }
 
   /**
@@ -280,6 +282,21 @@ public class ReplicationSpec {
     this.isNoop = isNoop;
   }
 
+  /**
+   * @return whether or not the current replication action is set to be lazy
+   */
+  public boolean isLazy() {
+    return isLazy;
+  }
+
+  /**
+   * @param isLazy whether or not the current replication action should be lazy
+   */
+  public void setLazy(boolean isLazy){
+    this.isLazy = isLazy;
+  }
+
+
   public String get(KEY key) {
     switch (key){
       case REPL_SCOPE:
@@ -297,6 +314,8 @@ public class ReplicationSpec {
         return getCurrentReplicationState();
       case NOOP:
         return String.valueOf(isNoop());
+      case LAZY:
+        return String.valueOf(isLazy());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index ed01a31..520d3de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -128,6 +128,10 @@ public final class SemanticAnalyzerFactory {
     commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT);
     commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK);
     commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT);
+    commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
+    commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
+    commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
+
   }
 
   static {
@@ -185,6 +189,12 @@ public final class SemanticAnalyzerFactory {
         return new ExportSemanticAnalyzer(queryState);
       case HiveParser.TOK_IMPORT:
         return new ImportSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_DUMP:
+        return new ReplicationSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_LOAD:
+        return new ReplicationSemanticAnalyzer(queryState);
+      case HiveParser.TOK_REPL_STATUS:
+        return new ReplicationSemanticAnalyzer(queryState);
       case HiveParser.TOK_ALTERTABLE: {
         Tree child = tree.getChild(1);
         switch (child.getType()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
new file mode 100644
index 0000000..1932d60
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+/**
+ * Marker work for Replication - behaves similar to CopyWork, but maps to ReplCopyTask,
+ * which will have mechanics to list the files in source to write to the destination,
+ * instead of copying them, if specified, falling back to copying if needed.
+ */
+@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplCopyWork extends CopyWork {
+
+  protected boolean copyFiles = true; // governs copy-or-list-files behaviour.
+  // If set to true, behaves identically to a CopyWork
+  // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files.
+  // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this.
+
+  /**
+   * TODO : Refactor
+   *
+   * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following:
+   *
+   * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy
+   * along data from the source to destination. If, however, listFilesOnOutput is set,
+   * then, instead of copying the individual files to the destination, it simply creates
+   * a file called _files on destination that contains the list of the original files
+   * that were intended to be copied. Thus, we do not actually copy the files at CopyWork
+   * time.
+   *
+   * The flip side of this behaviour happens when, instead, readListFromInput is set. This
+   * flag, if set, changes the source behaviour of this CopyTask, and instead of copying
+   * explicit files, this will then fall back to a behaviour wherein an _files is read from
+   * the source, and the files specified by the _files are then copied to the destination.
+   *
+   * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want
+   * to use from replication.
+   *
+   * ==
+   *
+   * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set,
+   * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput,
+   * and will generate a _files file.
+   *
+   * As to the input, we simply decide on whether to use the lazy mode or not depending on the
+   * presence of a _files file on the input. If we see a _files on the input, we simply expand it
+   * to copy as needed. If we do not, we copy as normal.
+   *
+   */
+
+  protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour
+  // If set to true, it'll iterate over input files, and for each file in the input,
+  //   it'll write out an additional line in a _files file in the output.
+  // If set to false, it'll behave as a traditional CopyTask.
+
+  protected boolean readListFromInput = false; // governs remote-fetch-input behaviour
+  // If set to true, we'll assume that the input has a _files file present which lists
+  //   the actual input files to copy, and we'll pull each of those on read.
+  // If set to false, it'll behave as a traditional CopyTask.
+
+  public ReplCopyWork() {
+  }
+
+  public ReplCopyWork(final Path fromPath, final Path toPath) {
+    super(fromPath, toPath, true);
+  }
+
+  public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+    super(fromPath, toPath, errorOnSrcEmpty);
+  }
+
+  public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){
+    this.listFilesOnOutput = listFilesOnOutput;
+  }
+
+  public boolean getListFilesOnOutputBehaviour(){
+    return this.listFilesOnOutput;
+  }
+
+  public void setReadListFromInput(boolean readListFromInput){
+    this.readListFromInput = readListFromInput;
+  }
+
+  public boolean getReadListFromInput(){
+    return this.readListFromInput;
+  }
+
+  // specialization of getListFilesOnOutputBehaviour, with a filestatus arg
+  // we can default to the default getListFilesOnOutputBehaviour behaviour,
+  // or, we can do additional pattern matching to decide that certain files
+  // should not be listed, and copied instead, _metadata files, for instance.
+  // Currently, we use this to skip _metadata files, but we might decide that
+  // this is not the right place for it later on.
+  public boolean getListFilesOnOutputBehaviour(FileStatus f) {
+    if (f.getPath().toString().contains("_metadata")){
+      return false; // always copy _metadata files
+    }
+    return this.listFilesOnOutput;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..54b2bd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestReplicationSemanticAnalyzer {
+  static QueryState queryState;
+  static HiveConf conf;
+  static String defaultDB = "default";
+  static String tblName = "testReplSA";
+  static ArrayList<String> cols =  new ArrayList<String>(Arrays.asList("col1", "col2"));
+  ParseDriver pd;
+  SemanticAnalyzer sA;
+
+
+  @BeforeClass
+  public static void initialize() throws HiveException {
+    queryState = new QueryState(new HiveConf(SemanticAnalyzer.class));
+    conf = queryState.getConf();
+    conf.set("hive.security.authorization.manager", "");
+    SessionState.start(conf);
+    Hive hiveDb = Hive.get(conf);
+    hiveDb.createTable(defaultDB + "." + tblName, cols, null, OrcInputFormat.class, OrcOutputFormat.class);
+    Table t = hiveDb.getTable(tblName);
+  }
+
+  @AfterClass
+  public static void teardown() throws HiveException {
+  }
+
+  @Test
+  public void testReplDumpParse() throws Exception {
+    ParseDriver pd = new ParseDriver();
+    String fromEventId = "100";
+    String toEventId = "200";
+    String batchSize = "50";
+    ASTNode root;
+    ASTNode child;
+
+    String query = "repl dump " + defaultDB;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_DUMP");
+    assertEquals(root.getChildCount(), 1);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 2);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 1);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId;
+
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), "TOK_TO");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(2);
+    assertEquals(child.getText(), toEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    query =
+        "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId
+            + " batch " + batchSize;
+
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getChildCount(), 3);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), defaultDB);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), tblName);
+    assertEquals(child.getChildCount(), 0);
+
+    root =  (ASTNode) root.getChild(2);
+    assertEquals(root.getText(), "TOK_FROM");
+    assertEquals(root.getChildCount(), 5);
+
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), fromEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), "TOK_TO");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(2);
+    assertEquals(child.getText(), toEventId);
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(3);
+    assertEquals(child.getText(), "TOK_BATCH");
+    assertEquals(child.getChildCount(), 0);
+
+    child =  (ASTNode) root.getChild(4);
+    assertEquals(child.getText(), batchSize);
+    assertEquals(child.getChildCount(), 0);
+  }
+
+  @Test
+  public void testReplLoadParse() throws Exception {
+    // FileSystem fs = FileSystem.get(conf);
+    ParseDriver pd = new ParseDriver();
+    ASTNode root;
+    ASTNode child;
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    Path dumpRoot = new Path(replRoot, "next");
+    System.out.println(replRoot);
+    System.out.println(dumpRoot);
+    String newDB = "default_bak";
+
+    String query = "repl load  from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_LOAD");
+    assertEquals(root.getChildCount(), 1);
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+    assertEquals(child.getChildCount(), 0);
+
+    query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_LOAD");
+    assertEquals(root.getChildCount(), 2);
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+    assertEquals(child.getChildCount(), 0);
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), newDB);
+    assertEquals(child.getChildCount(), 0);
+  }
+
+  // TODO: add this test after repl dump analyze generates tasks
+  //@Test
+  public void testReplDumpAnalyze() throws Exception {
+
+  }
+
+  //@Test
+  public void testReplLoadAnalyze() throws Exception {
+    ParseDriver pd = new ParseDriver();
+    ASTNode root;
+    String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+    FileSystem fs = FileSystem.get(conf);
+    Path dumpRoot = new Path(replRoot, "next");
+    System.out.println(replRoot);
+    System.out.println(dumpRoot);
+    String newDB = "default_bak";
+
+    // First create a dump
+    String query = "repl dump " + defaultDB;
+    root = (ASTNode) pd.parse(query).getChild(0);
+    ReplicationSemanticAnalyzer rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+
+    // Then analyze load
+    query = "repl load  from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+    List<Task<? extends Serializable>> roots = rs.getRootTasks();
+    assertEquals(1, roots.size());
+
+    query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+    rs.analyze(root, new Context(conf));
+    roots = rs.getRootTasks();
+    assertEquals(1, roots.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 03515ea..0caa42a 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@exim_department
 #### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile
+FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile,file