You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/06 19:21:47 UTC

hive git commit: HIVE-15332: REPL LOAD & DUMP support for incremental CREATE_TABLE/ADD_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)

Repository: hive
Updated Branches:
  refs/heads/master 0c8edf053 -> 2f5889c9b


HIVE-15332: REPL LOAD & DUMP support for incremental CREATE_TABLE/ADD_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f5889c9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f5889c9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f5889c9

Branch: refs/heads/master
Commit: 2f5889c9b94977b064ba614c89684404cbb9ca63
Parents: 0c8edf0
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Tue Dec 6 11:16:12 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Tue Dec 6 11:16:12 2016 -0800

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   1 +
 itests/hive-unit/pom.xml                        |   5 +
 .../hive/ql/TestReplicationScenarios.java       | 129 +++++++-
 .../hive/metastore/messaging/EventUtils.java    |  22 ++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  16 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  28 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  19 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 320 +++++++++++++++----
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |   8 +
 .../hadoop/hive/ql/plan/AddPartitionDesc.java   |  22 ++
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |  21 ++
 11 files changed, 516 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 494d01f..119801f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -332,6 +332,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
   private void enqueue(NotificationEvent event) {
     if (rs != null) {
       synchronized(NOTIFICATION_TBL_LOCK) {
+        LOG.debug("DbNotif:Enqueueing : {}:{}",event.getEventId(),event.getMessage());
         rs.addNotificationEvent(event);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index cd209b4..6a190d1 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -81,6 +81,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-server-extensions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-it-util</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 01abe9b..95db9e8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.Shell;
@@ -55,6 +56,7 @@ public class TestReplicationScenarios {
   static Driver driver;
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+  private ArrayList<String> lastResults;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -69,8 +71,8 @@ public class TestReplicationScenarios {
       WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf);
     }
 
-//    System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
-//        DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
+    System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+        DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
     msPort = MetaStoreUtils.startMetaStore();
     hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
     hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
@@ -106,6 +108,12 @@ public class TestReplicationScenarios {
     // after each test
   }
 
+  private static  int next = 0;
+  private synchronized void advanceDumpDir() {
+    next++;
+    ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+  }
+
   /**
    * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
    * Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -152,7 +160,7 @@ public class TestReplicationScenarios {
     run("SELECT * from " + dbName + ".unptned_empty");
     verifyResults(empty);
 
-
+    advanceDumpDir();
     run("REPL DUMP " + dbName);
     String replDumpLocn = getResult(0,0);
     run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'");
@@ -169,15 +177,116 @@ public class TestReplicationScenarios {
     verifyResults(empty);
   }
 
+  @Test
+  public void testIncrementalAdds() throws IOException {
+    String testName = "incrementalAdds";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned_late");
+    verifyResults(unptn_data);
+
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+
+    // verified up to here.
+    run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned_late WHERE b=2");
+    verifyResults(ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId );
+    String incrementalDumpLocn = getResult(0,0);
+    String incrementalDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+    run("SELECT * from " + dbName + "_dupe.unptned_empty");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+
+
+//  this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+//  to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+//  fixing that.
+//    run("SELECT * from " + dbName + "_dupe.unptned");
+//    verifyResults(unptn_data);
+    run("SELECT * from " + dbName + "_dupe.unptned_late");
+    verifyResults(unptn_data);
+
+
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+
+    // verified up to here.
+    run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
+    verifyResults(ptn_data_2);
+  }
+
   private String getResult(int rowNum, int colNum) throws IOException {
-    List<String> results = new ArrayList<String>();
-    try {
-      driver.getResults(results);
-    } catch (CommandNeedRetryException e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
+    return getResult(rowNum,colNum,false);
+  }
+  private String getResult(int rowNum, int colNum, boolean reuse) throws IOException {
+    if (!reuse) {
+      lastResults = new ArrayList<String>();
+      try {
+        driver.getResults(lastResults);
+      } catch (CommandNeedRetryException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
     }
-    return (results.get(rowNum).split("\\001"))[colNum];
+    return (lastResults.get(rowNum).split("\\001"))[colNum];
   }
 
   private void verifyResults(String[] data) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index 932af7e..927bf15 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -63,6 +63,28 @@ public class EventUtils {
     };
   }
 
+  public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){
+    return new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) {
+          return false;
+        }
+        return true;
+      }
+    };
+  }
+
+  public static IMetaStoreClient.NotificationFilter andFilter(
+      final IMetaStoreClient.NotificationFilter filter1,
+      final IMetaStoreClient.NotificationFilter filter2) {
+    return new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        return filter1.accept(event) && filter2.accept(event);
+      }
+    };
+  }
 
   public interface NotificationFetcher {
     public int getBatchSize() throws IOException;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 0ac9053..4b39eb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4084,6 +4084,22 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     LOG.info("creating table " + tbl.getDbName() + "." + tbl.getTableName() + " on " +
             tbl.getDataLocation());
 
+    if (crtTbl.getReplicationSpec().isInReplicationScope() && (!crtTbl.getReplaceMode())){
+      // if this is a replication spec, then replace-mode semantics might apply.
+      // if we're already asking for a table replacement, then we can skip this check.
+      // however, otherwise, if in replication scope, and we've not been explicitly asked
+      // to replace, we should check if the object we're looking at exists, and if so,
+      // trigger replace-mode semantics.
+      Table existingTable = db.getTable(tbl.getDbName(), tbl.getTableName(), false);
+      if (existingTable != null){
+        if (!crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable)){
+          return 0; // no replacement, the existing table state is newer than our update.
+        } else {
+          crtTbl.setReplaceMode(true); // we replace existing table.
+        }
+      }
+    }
+
     // create the table
     if (crtTbl.getReplaceMode()){
       // replace-mode creates are really alters using CreateTableDesc.

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8f230fc..e477f24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2016,18 +2016,40 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
     List<Partition> out = new ArrayList<Partition>();
     try {
-      if (!addPartitionDesc.getReplaceMode()){
+      if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){
         // TODO: normally, the result is not necessary; might make sense to pass false
         for (org.apache.hadoop.hive.metastore.api.Partition outPart
             : getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) {
           out.add(new Partition(tbl, outPart));
         }
       } else {
-        getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), in, null);
-        List<String> part_names = new ArrayList<String>();
+
+        // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario.
+        // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have
+        // no choice but to iterate over the partitions here.
+
+        List<org.apache.hadoop.hive.metastore.api.Partition> partsToAdd = new ArrayList<>();
+        List<org.apache.hadoop.hive.metastore.api.Partition> partsToAlter = new ArrayList<>();
+        List<String> part_names = new ArrayList<>();
         for (org.apache.hadoop.hive.metastore.api.Partition p: in){
           part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues()));
+          try {
+            org.apache.hadoop.hive.metastore.api.Partition ptn =
+                getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues());
+            if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn)){
+              partsToAlter.add(p);
+            } // else ptn already exists, but we do nothing with it.
+          } catch (NoSuchObjectException nsoe){
+            // if the object does not exist, we want to add it.
+            partsToAdd.add(p);
+          }
         }
+        for (org.apache.hadoop.hive.metastore.api.Partition outPart
+            : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) {
+          out.add(new Partition(tbl, outPart));
+        }
+        getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), partsToAlter, null);
+
         for ( org.apache.hadoop.hive.metastore.api.Partition outPart :
         getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){
           out.add(new Partition(tbl,outPart));

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 3420efd..ce952c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -204,6 +204,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     // Executed if relevant, and used to contain all the other details about the table if not.
     CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
 
+    if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+      tblDesc.setReplicationSpec(replicationSpec);
+    }
+
     if (isExternalSet){
       tblDesc.setExternal(isExternalSet);
       // This condition-check could have been avoided, but to honour the old
@@ -368,8 +372,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
-      EximUtil.SemanticAnalyzerWrapperContext x) {
+      EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) {
     tableDesc.setReplaceMode(true);
+    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+      tableDesc.setReplicationSpec(replicationSpec);
+    }
     return TaskFactory.get(new DDLWork(
         x.getInputs(),
         x.getOutputs(),
@@ -383,6 +390,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
       EximUtil.SemanticAnalyzerWrapperContext x) {
     addPartitionDesc.setReplaceMode(true);
+    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+      addPartitionDesc.setReplicationSpec(replicationSpec);
+    }
     addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
     return TaskFactory.get(new DDLWork(
         x.getInputs(),
@@ -860,6 +870,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!replicationSpec.isMetadataOnly()) {
         if (isPartitioned(tblDesc)) {
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+            addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
                 addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
           }
@@ -881,7 +892,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-
+          addPartitionDesc.setReplicationSpec(replicationSpec);
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
 
@@ -912,7 +923,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
           // MD-ONLY table alter
-          x.getTasks().add(alterTableTask(tblDesc, x));
+          x.getTasks().add(alterTableTask(tblDesc, x,replicationSpec));
           if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
             lockType = WriteEntity.WriteType.DDL_SHARED;
           }
@@ -925,7 +936,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (!replicationSpec.isMetadataOnly()) {
           loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
         } else {
-          x.getTasks().add(alterTableTask(tblDesc, x));
+          x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }
         if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
           lockType = WriteEntity.WriteType.DDL_SHARED;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 938355e..8007c4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,25 +28,26 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.io.IOUtils;
 
+import javax.annotation.Nullable;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -63,12 +67,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private String dbNameOrPattern;
   // Table name or pattern
   private String tblNameOrPattern;
-  private Integer eventFrom;
-  private Integer eventTo;
+  private Long eventFrom;
+  private Long eventTo;
   private Integer batchSize;
   // Base path for REPL LOAD
   private String path;
 
+  private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
+
   public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
@@ -114,13 +120,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       } else {
         // TOK_FROM subtree
         Tree fromNode = ast.getChild(currNode);
-        eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+        eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
         // skip the first, which is always required
         int numChild = 1;
         while (numChild < fromNode.getChildCount()) {
           if (fromNode.getChild(numChild).getType() == TOK_TO) {
             eventTo =
-                Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+                Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
             // skip the next child, since we already took care of it
             numChild++;
           } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
@@ -142,38 +148,212 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   // REPL DUMP
   private void analyzeReplDump(ASTNode ast) throws SemanticException {
-    // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
     LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
         + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
         + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, getNextDumpDir());
+    Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
     try {
-      for (String dbName : matchesDb(dbNameOrPattern)) {
-        LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
-        Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
-        for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
-          LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
-              + " to db root " + dbRoot.toUri());
-          dumpTbl(ast, dbName, tblName, dbRoot);
+      if (eventFrom == null){
+        // bootstrap case
+        String bootDumpBeginReplId =
+            String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+        for (String dbName : matchesDb(dbNameOrPattern)) {
+          LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+          Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+          for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+            LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+                + " to db root " + dbRoot.toUri());
+            dumpTbl(ast, dbName, tblName, dbRoot);
+          }
+        }
+        String bootDumpEndReplId =
+            String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+        LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId);
+
+        // Now that bootstrap has dumped all objects related, we have to account for the changes
+        // that occurred while bootstrap was happening - i.e. we have to look through all events
+        // during the bootstrap period and consolidate them with our dump.
+
+        IMetaStoreClient.NotificationFilter evFilter =
+            EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern);
+        EventUtils.MSClientNotificationFetcher evFetcher =
+            new EventUtils.MSClientNotificationFetcher(db.getMSC());
+        EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+            evFetcher, Long.valueOf(bootDumpBeginReplId),
+            Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1),
+            evFilter );
+
+        // Now we consolidate all the events that happenned during the objdump into the objdump
+        while (evIter.hasNext()){
+          NotificationEvent ev = evIter.next();
+          Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+          // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
         }
+        LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
+        prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), bootDumpEndReplId),
+            "dump_dir,last_repl_id#string,string");
+      } else {
+        // get list of events matching dbPattern & tblPattern
+        // go through each event, and dump out each event to a event-level dump dir inside dumproot
+        if (eventTo == null){
+          eventTo = db.getMSC().getCurrentNotificationEventId().getEventId();
+          LOG.debug("eventTo not specified, using current event id : {}", eventTo);
+        }
+
+        Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
+        batchSize = 15;
+        if (batchSize == null){
+          batchSize = maxRange;
+        } else {
+          if (batchSize > maxRange){
+            batchSize = maxRange;
+          }
+        }
+
+        IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
+            EventUtils.getDbTblNotificationFilter(dbNameOrPattern,tblNameOrPattern),
+            EventUtils.getEventBoundaryFilter(eventFrom, eventTo));
+
+        EventUtils.MSClientNotificationFetcher evFetcher
+            = new EventUtils.MSClientNotificationFetcher(db.getMSC());
+
+        EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+            evFetcher, eventFrom, batchSize, evFilter);
+
+        while (evIter.hasNext()){
+          NotificationEvent ev = evIter.next();
+          Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+          dumpEvent(ev,evRoot);
+        }
+
+        LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
+        List<String> vals;
+        writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata);
+        prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(eventTo)),
+            "dump_dir,last_repl_id#string,string");
       }
-      String currentReplId =
-          String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
-      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
-          "dump_dir,last_repl_id#string,string");
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
+      LOG.warn("Error during analyzeReplDump",e);
       throw new SemanticException(e);
     }
   }
 
+  private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
+    long evid = ev.getEventId();
+    String evidStr = String.valueOf(evid);
+    ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr);
+    MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+    switch (ev.getEventType()){
+      case MessageFactory.CREATE_TABLE_EVENT : {
+        LOG.info("Processing#{} CREATE_TABLE message : {}",ev.getEventId(),ev.getMessage());
+
+        // FIXME : Current MessageFactory api is lacking,
+        // and impl is in JSONMessageFactory instead. This needs to be
+        // refactored correctly so we don't depend on a specific impl.
+        org.apache.hadoop.hive.metastore.api.Table tobj =
+            JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev));
+        if (tobj == null){
+          LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
+          break;
+        }
+
+        Table qlMdTable = new Table(tobj);
+
+        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+        EximUtil.createExportDump(
+            metaDataPath.getFileSystem(conf),
+            metaDataPath,
+            qlMdTable,
+            null,
+            replicationSpec);
+
+        // FIXME : dump _files should happen at dbnotif time, doing it here is incorrect
+        // we will, however, do so here, now, for dev/debug's sake.
+        Path dataPath = new Path(evRoot,"data");
+        rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf));
+
+        break;
+      }
+      case MessageFactory.ADD_PARTITION_EVENT : {
+        LOG.info("Processing#{} ADD_PARTITION message : {}",ev.getEventId(),ev.getMessage());
+        // FIXME : Current MessageFactory api is lacking,
+        // and impl is in JSONMessageFactory instead. This needs to be
+        // refactored correctly so we don't depend on a specific impl.
+        List<org.apache.hadoop.hive.metastore.api.Partition> ptnObjs =
+            JSONMessageFactory.getPartitionObjList(JSONMessageFactory.getJsonTree(ev));
+        if ((ptnObjs == null) || (ptnObjs.size() == 0)) {
+          LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
+          break;
+        }
+        org.apache.hadoop.hive.metastore.api.Table tobj =
+            JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev));
+        if (tobj == null){
+          LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
+          break;
+        }
+
+        final Table qlMdTable = new Table(tobj);
+        List<Partition> qlPtns = Lists.transform(
+            ptnObjs,
+            new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
+              @Nullable
+              @Override
+              public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
+                if (input == null){
+                  return null;
+                }
+                try {
+                  return new Partition(qlMdTable,input);
+                } catch (HiveException e) {
+                  throw new IllegalArgumentException(e);
+                }
+              }
+            }
+        );
+
+        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+        EximUtil.createExportDump(
+            metaDataPath.getFileSystem(conf),
+            metaDataPath,
+            qlMdTable,
+            qlPtns,
+            replicationSpec);
+
+        // FIXME : dump _files should ideally happen at dbnotif time, doing it here introduces
+        // rubberbanding. But, till we have support for that, this is our closest equivalent
+        for (Partition qlPtn : qlPtns){
+          Path ptnDataPath = new Path(evRoot,qlPtn.getName());
+          rootTasks.add(ReplCopyTask.getDumpCopyTask(
+              replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf));
+        }
+
+        break;
+      }
+      default:
+        LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage());
+        // TODO : handle other event types
+        break;
+    }
+
+  }
+
+  public static void injectNextDumpDirForTest(String dumpdir){
+    testInjectDumpDir = dumpdir;
+  }
+
   String getNextDumpDir() {
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
-      return "next";
-      // make it easy to write unit tests, instead of unique id generation.
+      // make it easy to write .q unit tests, instead of unique id generation.
       // however, this does mean that in writing tests, we have to be aware that
       // repl dump will clash with prior dumps, and thus have to clean up properly.
+      if (testInjectDumpDir == null){
+        return "next";
+      } else {
+        return testInjectDumpDir;
+      }
     } else {
       return String.valueOf(System.currentTimeMillis());
       // TODO: time good enough for now - we'll likely improve this.
@@ -259,14 +439,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     // looking at each db, and then each table, and then setting up the appropriate
     // import job in its place.
 
-    // FIXME : handle non-bootstrap cases.
-
-    // We look at the path, and go through each subdir.
-    // Each subdir corresponds to a database.
-    // For each subdir, there is a _metadata file which allows us to re-impress the db object
-    // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
-    // that we had an IMPORT on each of them, into this db.
-
     try {
 
       Path loadPath = new Path(path);
@@ -277,25 +449,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         throw new FileNotFoundException(loadPath.toUri().toString());
       }
 
-      // Now, the dumped path can be one of two things:
+      // Now, the dumped path can be one of three things:
       // a) It can be a db dump, in which case we expect a set of dirs, each with a
       // db name, and with a _metadata file in each, and table dirs inside that.
       // b) It can be a table dump dir, in which case we expect a _metadata dump of
       // a table in question in the dir, and individual ptn dir hierarchy.
-      // Once we expand this into doing incremental repl, we can have individual events which can
-      // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
-      // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
-      // if dbname is specified, we expect exactly one db dumped, and having more is an error
-      // condition.
+      // c) A dump can be an event-level dump, which means we have several subdirs
+      // each of which have the evid as the dir name, and each of which correspond
+      // to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are
+      // handled, so all of these dumps will be at a table/ptn level.
+
+      // For incremental repl, eventually, we can have individual events which can
+      // be other things like roles and fns as well.
+
+      boolean evDump = false;
+      Path dumpMetadata = new Path(loadPath, "_dumpmetadata");
+      // TODO : only event dumps currently have _dumpmetadata - this might change. Generify.
+      if (fs.exists(dumpMetadata)){
+        LOG.debug("{} exists, this is a event dump", dumpMetadata);
+        evDump = true;
+      } else {
+        LOG.debug("{} does not exist, this is an object dump", dumpMetadata);
+      }
 
-      if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+      if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+        // not an event dump, and table name pattern specified, this has to be a tbl-level dump
         analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
         return;
       }
 
       FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
       if (srcs == null || (srcs.length == 0)) {
-        throw new FileNotFoundException(loadPath.toUri().toString());
+        LOG.warn("Nothing to load at {}",loadPath.toUri().toString());
+        return;
       }
 
       FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
@@ -304,19 +490,31 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
       }
 
-      if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
-        LOG.debug("Found multiple dirs when we expected 1:");
-        for (FileStatus d : dirsInLoadPath) {
-          LOG.debug("> " + d.getPath().toUri().toString());
+      if (!evDump){
+        // not an event dump, not a table dump - thus, a db dump
+        if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+          LOG.debug("Found multiple dirs when we expected 1:");
+          for (FileStatus d : dirsInLoadPath) {
+            LOG.debug("> " + d.getPath().toUri().toString());
+          }
+          throw new IllegalArgumentException(
+              "Multiple dirs in "
+                  + loadPath.toUri().toString()
+                  + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
         }
-        throw new IllegalArgumentException(
-            "Multiple dirs in "
-                + loadPath.toUri().toString()
-                + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
-      }
 
-      for (FileStatus dir : dirsInLoadPath) {
-        analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+        for (FileStatus dir : dirsInLoadPath) {
+          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+        }
+      } else {
+        // event dump, each subdir is an individual event dump.
+        for (FileStatus dir : dirsInLoadPath){
+          // event loads will behave similar to table loads, with one crucial difference
+          // precursor order is strict, and each event must be processed after the previous one.
+          LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
+          analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null);
+          // FIXME: we should have a strict order of execution so that each event's tasks occur linearly
+        }
       }
 
     } catch (Exception e) {
@@ -326,6 +524,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   }
 
+  private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern,
+      FileSystem fs, FileStatus dir) throws SemanticException {
+
+  }
+
   private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
       throws SemanticException {
     try {
@@ -486,11 +689,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
     // this is a partitioned dir, which does not work. Thus, this does not work.
 
-    writeOutput(values);
+    writeOutput(values,ctx.getResFile());
   }
 
-  private void writeOutput(List<String> values) throws SemanticException {
-    Path outputFile = ctx.getResFile();
+  private void writeOutput(List<String> values, Path outputFile) throws SemanticException {
     FileSystem fs = null;
     DataOutputStream outStream = null;
     try {
@@ -499,7 +701,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
       for (int i = 1; i < values.size(); i++) {
         outStream.write(Utilities.ctrlaCode);
-        outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+        outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
       }
       outStream.write(Utilities.newLineCode);
     } catch (IOException e) {
@@ -513,17 +715,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private ReplicationSpec getNewReplicationSpec() throws SemanticException {
     try {
-      ReplicationSpec replicationSpec =
-          new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
-      replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+      ReplicationSpec rspec = getNewReplicationSpec("replv2","will-be-set");
+      rspec.setCurrentReplicationState(String.valueOf(db.getMSC()
           .getCurrentNotificationEventId().getEventId()));
-      return replicationSpec;
+      return rspec;
     } catch (Exception e) {
-      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
-                                      // codes
+      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes
     }
   }
 
+  private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
+    return new ReplicationSpec(true, false, evState, objState, false, true);
+  }
+
   private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
       throws HiveException {
     if (tblPattern == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 824cf11..060f2a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -179,6 +179,14 @@ public class ReplicationSpec {
   }
 
   /**
+   * Determines if a current replication object(current state of dump) is allowed to
+   * replicate-replace-into a given partition
+   */
+  public boolean allowReplacementInto(org.apache.hadoop.hive.metastore.api.Partition ptn){
+    return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getCurrentReplicationState());
+  }
+
+  /**
    * Determines if a current replication event specification is allowed to
    * replicate-replace-into a given partition
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
index 7a583c3..6ffd94a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
@@ -25,12 +25,14 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 
 /**
  * Contains the information needed to add one or more partitions.
  */
 public class AddPartitionDesc extends DDLDesc implements Serializable {
 
+
   public static class OnePartitionDesc {
     public OnePartitionDesc() {}
 
@@ -152,6 +154,7 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
   boolean ifNotExists;
   List<OnePartitionDesc> partitions = null;
   boolean replaceMode = false;
+  private ReplicationSpec replicationSpec = null;
 
 
   /**
@@ -302,4 +305,23 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
   public boolean getReplaceMode() {
     return this.replaceMode;
   }
+
+  /**
+   * @param replicationSpec Sets the replication spec governing this create.
+   * This parameter will have meaningful values only for creates happening as a result of a replication.
+   */
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  /**
+   * @return what kind of replication scope this drop is running under.
+   * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic
+   */
+  public ReplicationSpec getReplicationSpec(){
+    if (replicationSpec == null){
+      this.replicationSpec = new ReplicationSpec();
+    }
+    return this.replicationSpec;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index 60858e6..4f614a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -91,6 +92,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
   boolean isTemporary = false;
   private boolean isMaterialization = false;
   private boolean replaceMode = false;
+  private ReplicationSpec replicationSpec = null;
   private boolean isCTAS = false;
   List<SQLPrimaryKey> primaryKeys;
   List<SQLForeignKey> foreignKeys;
@@ -646,6 +648,25 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
     return replaceMode;
   }
 
+  /**
+   * @param replicationSpec Sets the replication spec governing this create.
+   * This parameter will have meaningful values only for creates happening as a result of a replication.
+   */
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  /**
+   * @return what kind of replication scope this drop is running under.
+   * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic
+   */
+  public ReplicationSpec getReplicationSpec(){
+    if (replicationSpec == null){
+      this.replicationSpec = new ReplicationSpec();
+    }
+    return this.replicationSpec;
+  }
+
   public boolean isCTAS() {
     return isCTAS;
   }