You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/08/16 04:14:59 UTC

hive git commit: HIVE-16990 : REPL LOAD should update last repl ID only after successful copy of data files. (Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master 34b0e07a3 -> 60ae30ad2


HIVE-16990 : REPL LOAD should update last repl ID only after successful copy of data files. (Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/60ae30ad
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/60ae30ad
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/60ae30ad

Branch: refs/heads/master
Commit: 60ae30ad217687cc2ad90c5297d7048a5bbb53b6
Parents: 34b0e07
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Tue Aug 15 21:14:53 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Aug 15 21:14:53 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/parse/TestReplicationScenarios.java |  86 +++++----
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |   3 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   9 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  77 ++++----
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 192 +++++++++++--------
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |  48 ++---
 .../dump/BootStrapReplicationSpecFunction.java  |   6 +-
 .../parse/repl/dump/io/PartitionSerializer.java |  10 +-
 .../ql/parse/repl/dump/io/TableSerializer.java  |  14 +-
 .../parse/repl/load/UpdatedMetaDataTracker.java |  76 ++++++++
 .../load/message/AbstractMessageHandler.java    |  17 +-
 .../repl/load/message/AddForeignKeyHandler.java |   3 +-
 .../message/AddNotNullConstraintHandler.java    |   3 +-
 .../repl/load/message/AddPrimaryKeyHandler.java |   3 +-
 .../message/AddUniqueConstraintHandler.java     |   3 +-
 .../load/message/CreateFunctionHandler.java     |   3 +-
 .../load/message/DropConstraintHandler.java     |   3 +-
 .../repl/load/message/DropFunctionHandler.java  |   2 +-
 .../repl/load/message/DropPartitionHandler.java |   3 +-
 .../repl/load/message/DropTableHandler.java     |   2 +-
 .../parse/repl/load/message/InsertHandler.java  |   3 +-
 .../parse/repl/load/message/MessageHandler.java |   6 +-
 .../load/message/RenamePartitionHandler.java    |   3 +-
 .../repl/load/message/RenameTableHandler.java   |   5 +-
 .../parse/repl/load/message/TableHandler.java   |   3 +-
 .../load/message/TruncatePartitionHandler.java  |   3 +-
 .../repl/load/message/TruncateTableHandler.java |   3 +-
 .../hadoop/hive/ql/plan/AlterDatabaseDesc.java  |   2 +-
 29 files changed, 340 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b020351..146fb37 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFil
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
@@ -298,7 +297,6 @@ public class TestReplicationScenarios {
     String[] unptn_data = new String[]{ "eleven" , "twelve" };
     String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
-    String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
     String[] empty = new String[]{};
 
     String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
@@ -309,7 +307,6 @@ public class TestReplicationScenarios {
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
     createTestDataFile(ptn_locn_2, ptn_data_2);
-    createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
 
     run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
     run("SELECT * from " + dbName + ".unptned", driver);
@@ -332,15 +329,9 @@ public class TestReplicationScenarios {
 
     // Table dropped after "repl dump"
     run("DROP TABLE " + dbName + ".unptned", driver);
+
     // Partition droppped after "repl dump"
     run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver);
-    // File changed after "repl dump"
-    Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
-    Path loc = new Path(p.getSd().getLocation());
-    FileSystem fs = loc.getFileSystem(hconf);
-    Path file = fs.listStatus(loc)[0].getPath();
-    fs.delete(file, false);
-    fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
 
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror);
     printOutput(driverMirror);
@@ -353,10 +344,8 @@ public class TestReplicationScenarios {
     verifyResults(unptn_data, driverMirror);
     run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror);
     verifyResults(ptn_data_1, driverMirror);
-    // Since partition(b=2) changed manually, Hive cannot find
-    // it in original location and cmroot, thus empty
     run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror);
-    verifyResults(empty, driverMirror);
+    verifyResults(ptn_data_2, driverMirror);
     run("SELECT a from " + dbName + ".ptned_empty", driverMirror);
     verifyResults(empty, driverMirror);
     run("SELECT * from " + dbName + ".unptned_empty", driverMirror);
@@ -1281,6 +1270,7 @@ public class TestReplicationScenarios {
 
     String[] ptn_data = new String[]{ "ten"};
     run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data[0] + "')", driver);
+    run("DROP TABLE " + dbName + ".ptned", driver);
 
     // Inject a behaviour where it throws exception if an INSERT event is found
     // As we dynamically add a partition through INSERT INTO cmd, it should just add ADD_PARTITION
@@ -1300,6 +1290,7 @@ public class TestReplicationScenarios {
             if (event.getDbName().equalsIgnoreCase(dbName)) {
               if (event.getEventType() == "INSERT") {
                 // If an insert event is found, then return null hence no event is dumped.
+                LOG.error("Encountered INSERT event when it was not expected to");
                 return null;
               }
             }
@@ -1316,7 +1307,7 @@ public class TestReplicationScenarios {
     eventTypeValidator.assertInjectionsPerformed(true,false);
     InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour
 
-    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror);
+    verifyIfTableNotExist(replDbName , "ptned", metaStoreClientMirror);
   }
 
   @Test
@@ -2546,32 +2537,58 @@ public class TestReplicationScenarios {
 
     // Replicate all the events happened so far
     Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName);
-    verifySetup("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
-    verifySetup("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
   }
 
   @Test
-  public void testStatus() throws IOException {
-    // first test ReplStateMap functionality
-    Map<String,Long> cmap = new ReplStateMap<String,Long>();
+  public void testIncrementalLoadFailAndRetry() throws IOException {
+    String testName = "incrementalLoadFailAndRetry";
+    String dbName = createDB(testName, driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
 
-    Long oldV;
-    oldV = cmap.put("a",1L);
-    assertEquals(1L,cmap.get("a").longValue());
-    assertEquals(null,oldV);
+    // Bootstrap dump/load
+    String replDbName = dbName + "_dupe";
+    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
 
-    cmap.put("b",2L);
-    oldV = cmap.put("b",-2L);
-    assertEquals(2L, cmap.get("b").longValue());
-    assertEquals(2L, oldV.longValue());
+    // Prefixed with incrementalLoadFailAndRetry to avoid finding entry in cmpath
+    String[] ptn_data_1 = new String[] { "incrementalLoadFailAndRetry_fifteen" };
+    String[] empty = new String[] {};
 
-    cmap.put("c",3L);
-    oldV = cmap.put("c",33L);
-    assertEquals(33L, cmap.get("c").longValue());
-    assertEquals(3L, oldV.longValue());
+    run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver);
+    run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver);
 
-    // Now, to actually testing status - first, we bootstrap.
+    // Move the data files of this newly created partition to a temp location
+    Partition ptn = null;
+    try {
+      ptn = metaStoreClient.getPartition(dbName, "ptned", new ArrayList<>(Arrays.asList("1")));
+    } catch (Exception e) {
+      assert(false);
+    }
+
+    Path ptnLoc = new Path(ptn.getSd().getLocation());
+    Path tmpLoc = new Path(TEST_PATH + "/incrementalLoadFailAndRetry");
+    FileSystem dataFs = ptnLoc.getFileSystem(hconf);
+    assert(dataFs.rename(ptnLoc, tmpLoc));
+
+    // Replicate all the events happened so far. It should fail as the data files missing in
+    // original path and not available in CM as well.
+    Tuple incrDump = replDumpDb(dbName, bootstrapDump.lastReplId, null, null);
+    verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror);
+
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
+    verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror);
 
+    // Move the files back to original data location
+    assert(dataFs.rename(tmpLoc, ptnLoc));
+    loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId);
+
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+  }
+
+  @Test
+  public void testStatus() throws IOException {
     String name = testName.getMethodName();
     String dbName = createDB(name, driver);
     advanceDumpDir();
@@ -3000,8 +3017,9 @@ public class TestReplicationScenarios {
     } catch (AssertionError ae){
       LOG.warn("AssertionError:",ae);
       throw new RuntimeException(ae);
+    } catch (Exception e) {
+      success = false;
     }
-
     assertFalse(success);
   }
 
@@ -3034,7 +3052,7 @@ public class TestReplicationScenarios {
     boolean success = false;
     try {
       CommandProcessorResponse ret = myDriver.run(cmd);
-      success = (ret.getException() == null);
+      success = ((ret.getException() == null) && (ret.getErrorMessage() == null));
       if (!success){
         LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), ret.getErrorMessage(), cmd);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 3d69d19..364db27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3646,7 +3646,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (allPartitions == null) {
         db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), alterTbl.getEnvironmentContext());
       } else {
-        db.alterPartitions(tbl.getTableName(), allPartitions, alterTbl.getEnvironmentContext());
+        db.alterPartitions(alterTbl.getOldName(), allPartitions, alterTbl.getEnvironmentContext());
       }
       // Add constraints if necessary
       addConstraints(db, alterTbl);

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 07f9167..54746d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -165,8 +165,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
                 .getPath();
         filePaths.add(f);
       } catch (MetaException e) {
-        // skip and issue warning for missing file
+        // issue warning for missing file and throw exception
         LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot");
+        throw new IOException(e.getMessage());
       }
       // Note - we need srcFs rather than fs, because it is possible that the _files lists files
       // which are from a different filesystem than the fs where the _files file itself was loaded

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 05fc5e4..67a67fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -166,7 +166,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   }
 
   private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
-    return getNewReplicationSpec(eventId.toString(), eventId.toString());
+    ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString());
+    rspec.setIsIncrementalDump(true);
+    return rspec;
   }
 
   private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
@@ -251,12 +253,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private ReplicationSpec getNewReplicationSpec() throws TException {
     ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
     rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC()
-        .getCurrentNotificationEventId().getEventId()));
+            .getCurrentNotificationEventId().getEventId()));
     return rspec;
   }
 
   private ReplicationSpec getNewReplicationSpec(String evState, String objState) {
-    return new ReplicationSpec(true, false, evState, objState, false, true, true);
+    return new ReplicationSpec(true, false, false, evState, objState,
+                               false, true, true);
   }
 
   private String getNextDumpDir() {

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 37edd5c..606a414 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -59,10 +59,10 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -143,7 +143,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
           parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
           new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx),
-          null, null);
+          null);
 
     } catch (SemanticException e) {
       throw e;
@@ -180,7 +180,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       String parsedLocation, String parsedTableName, String parsedDbName,
       LinkedHashMap<String, String> parsedPartSpec,
       String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x,
-      Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated
+      UpdatedMetaDataTracker updatedMetadata
   ) throws IOException, MetaException, HiveException, URISyntaxException {
 
     // initialize load path
@@ -200,6 +200,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     ReplicationSpec replicationSpec = rv.getReplicationSpec();
     if (replicationSpec.isNoop()){
       // nothing to do here, silently return.
+      x.getLOG().debug("Current update with ID:{} is noop",
+                                  replicationSpec.getCurrentReplicationState());
       return false;
     }
 
@@ -208,11 +210,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       // If the parsed statement contained a db.tablename specification, prefer that.
       dbname = parsedDbName;
     }
-    if (dbsUpdated != null){
-      dbsUpdated.put(
-          dbname,
-          Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
-    }
 
     // Create table associated with the import
     // Executed if relevant, and used to contain all the other details about the table if not.
@@ -223,7 +220,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       throw new HiveException(e);
     }
 
-    if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+    if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
       tblDesc.setReplicationSpec(replicationSpec);
     }
 
@@ -242,11 +239,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
       tblDesc.setTableName(parsedTableName);
     }
-    if (tablesUpdated != null){
-      tablesUpdated.put(
-          dbname + "." + tblDesc.getTableName(),
-          Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
-    }
 
     List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
     Iterable<Partition> partitions = rv.getPartitions();
@@ -305,8 +297,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     } else {
       createReplImportTasks(
           tblDesc, partitionDescs,
-          isPartSpecSet, replicationSpec, waitOnPrecursor, table,
-          fromURI, fs, wh, x);
+          replicationSpec, waitOnPrecursor, table,
+          fromURI, fs, wh, x, updatedMetadata);
     }
     return tableExists;
   }
@@ -357,14 +349,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf());
   }
 
-  private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
-    return TaskFactory.get(new DDLWork(
-        x.getInputs(),
-        x.getOutputs(),
-        new DropTableDesc(table.getTableName(), null, true, true, null)
-    ), x.getConf());
-  }
-
   private static Task<? extends Serializable> alterTableTask(ImportTableDesc tableDesc,
       EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) {
     tableDesc.setReplaceMode(true);
@@ -790,12 +774,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static void createReplImportTasks(
       ImportTableDesc tblDesc,
       List<AddPartitionDesc> partitionDescs,
-      boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
+      ReplicationSpec replicationSpec, boolean waitOnPrecursor,
       Table table, URI fromURI, FileSystem fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x)
+      EximUtil.SemanticAnalyzerWrapperContext x,
+      UpdatedMetaDataTracker updatedMetadata)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
-    Task dr = null;
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
 
     // Normally, on import, trying to create a table or a partition in a db that does not yet exist
@@ -813,16 +797,27 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     if (table != null) {
       if (!replicationSpec.allowReplacementInto(table.getParameters())) {
         // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it.
+        x.getLOG().info("Table {}.{} is not replaced as it is newer than the update",
+                tblDesc.getDatabaseName(), tblDesc.getTableName());
         return;
       }
     } else {
       // If table doesn't exist, allow creating a new one only if the database state is older than the update.
       if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) {
         // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it.
+        x.getLOG().info("Table {}.{} is not created as the database is newer than the update",
+                tblDesc.getDatabaseName(), tblDesc.getTableName());
         return;
       }
     }
 
+    if (updatedMetadata != null) {
+      updatedMetadata.set(replicationSpec.getReplicationState(),
+                          tblDesc.getDatabaseName(),
+                          tblDesc.getTableName(),
+                          null);
+    }
+
     if (tblDesc.getLocation() == null) {
       if (!waitOnPrecursor){
         tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString());
@@ -847,8 +842,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
        behave like a noop or a pure MD alter.
     */
     if (table == null) {
-      // Either we're dropping and re-creating, or the table didn't exist, and we're creating.
-
       if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
         lockType = WriteEntity.WriteType.DDL_SHARED;
       }
@@ -862,20 +855,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
                 addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+            if (updatedMetadata != null) {
+              updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+            }
           }
         } else {
           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
           t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
         }
       }
-      if (dr == null){
-        // Simply create
-        x.getTasks().add(t);
-      } else {
-        // Drop and recreate
-        dr.addDependentTask(t);
-        x.getTasks().add(dr);
-      }
+      // Simply create
+      x.getTasks().add(t);
     } else {
       // Table existed, and is okay to replicate into, not dropping and re-creating.
       if (table.isPartitioned()) {
@@ -889,6 +879,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
                   fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+              if (updatedMetadata != null) {
+                updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+              }
             }
           } else {
             // If replicating, then the partition already existing means we need to replace, maybe, if
@@ -901,14 +894,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
                 x.getTasks().add(alterSinglePartition(
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
               }
+              if (updatedMetadata != null) {
+                updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+              }
               if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
                 lockType = WriteEntity.WriteType.DDL_SHARED;
               }
-            } else {
-              // ignore this ptn, do nothing, not an error.
             }
           }
-
         }
         if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
           // MD-ONLY table alter
@@ -919,9 +912,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
       } else {
         x.getLOG().debug("table non-partitioned");
-        if (!replicationSpec.allowReplacementInto(table.getParameters())){
-          return; // silently return, table is newer than our replacement.
-        }
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
           loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x);
@@ -934,7 +924,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
     x.getOutputs().add(new WriteEntity(table,lockType));
-
   }
 
   public static boolean isPartitioned(ImportTableDesc tblDesc) {

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index d4fc340..3e2c513 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -328,9 +331,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
         int evstage = 0;
         int evIter = 0;
-        Long lastEvid = null;
-        Map<String,Long> dbsUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
-        Map<String,Long> tablesUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
 
         REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL",
                 (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
@@ -360,8 +360,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           String locn = dir.getPath().toUri().toString();
           DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
           List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
-              dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
-              dbsUpdated, tablesUpdated, eventDmd);
+              dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd);
           evIter++;
           REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " +
                               "with ID: {}, Type: {}, Path: {}",
@@ -380,80 +379,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                 taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
             taskChainTail = barrierTask;
             evstage++;
-            lastEvid = dmd.getEventTo();
           }
         }
-
-        // Now, we need to update repl.last.id for the various parent objects that were updated.
-        // This update logic will work differently based on what "level" REPL LOAD was run on.
-        //  a) If this was a REPL LOAD at a table level, i.e. both dbNameOrPattern and
-        //     tblNameOrPattern were specified, then the table is the only thing we should
-        //     update the repl.last.id for.
-        //  b) If this was a db-level REPL LOAD, then we should update the db, as well as any
-        //     tables affected by partition level operations. (any table level ops will
-        //     automatically be updated as the table gets updated. Note - renames will need
-        //     careful handling.
-        //  c) If this was a wh-level REPL LOAD, then we should update every db for which there
-        //     were events occurring, as well as tables for which there were ptn-level ops
-        //     happened. Again, renames must be taken care of.
-        //
-        // So, what we're going to do is have each event load update dbsUpdated and tablesUpdated
-        // accordingly, but ignore updates to tablesUpdated & dbsUpdated in the case of a
-        // table-level REPL LOAD, using only the table itself. In the case of a db-level REPL
-        // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore tblsUpdated.
-        // And for wh-level, we do no special processing, and use all of dbsUpdated and
-        // tblsUpdated as-is.
-
-        // Additional Note - although this var says "dbNameOrPattern", on REPL LOAD side,
-        // we do not support a pattern It can be null or empty, in which case
-        // we re-use the existing name from the dump, or it can be specified,
-        // in which case we honour it. However, having this be a pattern is an error.
-        // Ditto for tblNameOrPattern.
-
-
-        if (evstage > 0){
-          if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){
-            // if tblNameOrPattern is specified, then dbNameOrPattern will be too, and
-            // thus, this is a table-level REPL LOAD - only table needs updating.
-            // If any of the individual events logged any other dbs as having changed,
-            // null them out.
-            dbsUpdated.clear();
-            tablesUpdated.clear();
-            tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, lastEvid);
-          } else  if ((dbNameOrPattern != null) && (!dbNameOrPattern.isEmpty())){
-            // if dbNameOrPattern is specified and tblNameOrPattern isn't, this is a
-            // db-level update, and thus, the database needs updating. In addition.
-            dbsUpdated.clear();
-            dbsUpdated.put(dbNameOrPattern, lastEvid);
-          }
-        }
-
-        for (String tableName : tablesUpdated.keySet()){
-          // weird - AlterTableDesc requires a HashMap to update props instead of a Map.
-          HashMap<String, String> mapProp = new HashMap<>();
-          String eventId = tablesUpdated.get(tableName).toString();
-
-          mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
-          AlterTableDesc alterTblDesc =  new AlterTableDesc(
-              AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId));
-          alterTblDesc.setProps(mapProp);
-          alterTblDesc.setOldName(tableName);
-          Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-              new DDLWork(inputs, outputs, alterTblDesc), conf);
-          taskChainTail.addDependentTask(updateReplIdTask);
-          taskChainTail = updateReplIdTask;
-        }
-        for (String dbName : dbsUpdated.keySet()){
-          Map<String, String> mapProp = new HashMap<>();
-          String eventId = dbsUpdated.get(dbName).toString();
-
-          mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
-          AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId));
-          Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-              new DDLWork(inputs, outputs, alterDbDesc), conf);
-          taskChainTail.addDependentTask(updateReplIdTask);
-          taskChainTail = updateReplIdTask;
-        }
         rootTasks.add(evTaskRoot);
         REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " +
                             "(DDL/COPY/MOVE) tasks",
@@ -465,12 +392,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e);
     }
-
   }
 
   private List<Task<? extends Serializable>> analyzeEventLoad(
-      String dbName, String tblName, String location, Task<? extends Serializable> precursor,
-      Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, DumpMetaData dmd)
+      String dbName, String tblName,
+      String location,
+      Task<? extends Serializable> precursor,
+      DumpMetaData dmd)
       throws SemanticException {
     MessageHandler.Context context =
         new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG);
@@ -484,10 +412,110 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
       }
     }
-    dbsUpdated.putAll(messageHandler.databasesUpdated());
-    tablesUpdated.putAll(messageHandler.tablesUpdated());
     inputs.addAll(messageHandler.readEntities());
     outputs.addAll(messageHandler.writeEntities());
+    return addUpdateReplStateTasks(StringUtils.isEmpty(tblName),
+                            messageHandler.getUpdatedMetadata(), tasks);
+  }
+
+  private Task<? extends Serializable> tableUpdateReplStateTask(
+                                                        String dbName,
+                                                        String tableName,
+                                                        Map<String, String> partSpec,
+                                                        String replState,
+                                                        Task<? extends Serializable> preCursor) {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterTableDesc alterTblDesc =  new AlterTableDesc(
+            AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(dbName + "." + tableName);
+    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+                      new DDLWork(inputs, outputs, alterTblDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      LOG.debug("Added {}:{} as a precursor of {}:{}",
+              preCursor.getClass(), preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private Task<? extends Serializable> dbUpdateReplStateTask(
+                                                        String dbName,
+                                                        String replState,
+                                                        Task<? extends Serializable> preCursor) {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(
+                            dbName, mapProp, new ReplicationSpec(replState, replState));
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+                            new DDLWork(inputs, outputs, alterDbDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      LOG.debug("Added {}:{} as a precursor of {}:{}",
+              preCursor.getClass(), preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private List<Task<? extends Serializable>> addUpdateReplStateTasks(
+          boolean isDatabaseLoad,
+          UpdatedMetaDataTracker updatedMetadata,
+          List<Task<? extends Serializable>> importTasks) {
+    String replState = updatedMetadata.getReplicationState();
+    String dbName = updatedMetadata.getDatabase();
+    String tableName = updatedMetadata.getTable();
+
+    // If no import tasks generated by the event or no table updated for table level load, then no
+    // need to update the repl state to any object.
+    if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) {
+      LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+      return importTasks;
+    }
+
+    // Create a barrier task for dependency collection of import tasks
+    Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
+
+    // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+    for (Task<? extends Serializable> t : importTasks){
+      t.addDependentTask(barrierTask);
+      LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+              t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+    }
+
+    List<Task<? extends Serializable>> tasks = new ArrayList<>();
+    Task<? extends Serializable> updateReplIdTask;
+
+    // If any partition is updated, then update repl state in partition object
+    for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
+      updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    if (tableName != null) {
+      // If any table/partition is updated, then update repl state in table object
+      updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    // For table level load, need not update replication state for the database
+    if (isDatabaseLoad) {
+      // If any table/partition is updated, then update repl state in db object
+      updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
+      tasks.add(updateReplIdTask);
+    }
+
+    // At least one task would have been added to update the repl state
     return tasks;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4badea6..1c54d29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import javax.annotation.Nullable;
 import java.text.Collator;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -39,6 +38,7 @@ public class ReplicationSpec {
 
   private boolean isInReplicationScope = false; // default is that it's not in a repl scope
   private boolean isMetadataOnly = false; // default is full export/import, not metadata-only
+  private boolean isIncrementalDump = false; // default is replv2 bootstrap dump or replv1 export or import/load.
   private String eventId = null;
   private String currStateId = null;
   private boolean isNoop = false;
@@ -71,34 +71,6 @@ public class ReplicationSpec {
   static private Collator collator = Collator.getInstance();
 
   /**
-   * Class that extends HashMap with a slightly different put semantic, where
-   * put behaves as follows:
-   *  a) If the key does not already exist, then retains existing HashMap.put behaviour
-   *  b) If the map already contains an entry for the given key, then will replace only
-   *     if the new value is "greater" than the old value.
-   *
-   * The primary goal for this is to track repl updates for dbs and tables, to replace state
-   * only if the state is newer.
-   */
-  public static class ReplStateMap<K,V extends Comparable> extends HashMap<K,V> {
-    @Override
-    public V put(K k, V v){
-      if (!containsKey(k)){
-        return super.put(k,v);
-      }
-      V oldValue = get(k);
-      if (v.compareTo(oldValue) > 0){
-        return super.put(k,v);
-      }
-      // we did no replacement, but return the old value anyway. This
-      // seems most consistent with HashMap behaviour, becuse the "put"
-      // was effectively processed and consumed, although we threw away
-      // the enw value.
-      return oldValue;
-    }
-  }
-
-  /**
    * Constructor to construct spec based on either the ASTNode that
    * corresponds to the replication clause itself, or corresponds to
    * the parent node, and will scan through the children to instantiate
@@ -134,14 +106,16 @@ public class ReplicationSpec {
   }
 
   public ReplicationSpec(String fromId, String toId) {
-    this(true, false, fromId, toId, false, true, false);
+    this(true, false, false, fromId, toId, false, true, false);
   }
 
   public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+                         boolean isIncrementalDump,
                          String eventReplicationState, String currentReplicationState,
                          boolean isNoop, boolean isLazy, boolean isReplace) {
     this.isInReplicationScope = isInReplicationScope;
     this.isMetadataOnly = isMetadataOnly;
+    this.isIncrementalDump = isIncrementalDump;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
     this.isNoop = isNoop;
@@ -151,8 +125,9 @@ public class ReplicationSpec {
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
     String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
-    this.isMetadataOnly = false;
     this.isInReplicationScope = false;
+    this.isMetadataOnly = false;
+    this.isIncrementalDump = false;
     if (scope != null) {
       if (scope.equalsIgnoreCase("metadata")) {
         this.isMetadataOnly = true;
@@ -258,6 +233,17 @@ public class ReplicationSpec {
   }
 
   /**
+   * @return true if this statement refers to incremental dump operation.
+   */
+  public boolean isIncrementalDump(){
+    return isIncrementalDump;
+  }
+
+  public void setIsIncrementalDump(boolean isIncrementalDump){
+    this.isIncrementalDump = isIncrementalDump;
+  }
+
+  /**
    * @return true if this statement refers to metadata-only operation.
    */
   public boolean isMetadataOnly(){

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
index ae37c73..6a05ea4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -32,18 +32,20 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep
   @Override
   public ReplicationSpec fromMetaStore() throws HiveException {
     try {
+      long currentNotificationId = db.getMSC()
+              .getCurrentNotificationEventId().getEventId();
       ReplicationSpec replicationSpec =
           new ReplicationSpec(
               true,
               false,
+              false,
               "replv2",
               "will-be-set",
               false,
               true,
               false
           );
-      long currentNotificationId = db.getMSC()
-          .getCurrentNotificationEventId().getEventId();
+
       replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
       return replicationSpec;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 077d39b..2c7414f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -42,9 +42,13 @@ public class PartitionSerializer implements JsonWriter.Serializer {
     TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
     try {
       if (additionalPropertiesProvider.isInReplicationScope()) {
-        partition.putToParameters(
-            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-            additionalPropertiesProvider.getCurrentReplicationState());
+        // Current replication state must be set on the Partition object only for bootstrap dump.
+        // Event replication State will be null in case of bootstrap dump.
+        if (!additionalPropertiesProvider.isIncrementalDump()) {
+          partition.putToParameters(
+                  ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+                  additionalPropertiesProvider.getCurrentReplicationState());
+        }
         if (isPartitionExternal()) {
           // Replication destination will not be external
           partition.putToParameters("EXTERNAL", "FALSE");

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 948cb39..c443e53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -68,17 +68,21 @@ public class TableSerializer implements JsonWriter.Serializer {
   private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
       throws SemanticException, IOException {
     if (additionalPropertiesProvider.isInReplicationScope()) {
-      table.putToParameters(
-            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-            additionalPropertiesProvider.getCurrentReplicationState());
+      // Current replication state must be set on the Table object only for bootstrap dump.
+      // Event replication State will be null in case of bootstrap dump.
+      if (!additionalPropertiesProvider.isIncrementalDump()) {
+        table.putToParameters(
+                ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+                additionalPropertiesProvider.getCurrentReplicationState());
+      }
       if (isExternalTable(table)) {
           // Replication destination will not be external - override if set
         table.putToParameters("EXTERNAL", "FALSE");
-        }
+      }
       if (isExternalTableType(table)) {
           // Replication dest will not be external - override if set
         table.setTableType(TableType.MANAGED_TABLE.toString());
-        }
+      }
     } else {
       // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
       // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
new file mode 100644
index 0000000..5714b21
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Utility class to help track and return the metadata which are updated by repl load
+ */
+public class UpdatedMetaDataTracker {
+  private String replState;
+  private String dbName;
+  private String tableName;
+  private List<Map <String, String>> partitionsList;
+
+  public UpdatedMetaDataTracker() {
+    this.replState = null;
+    this.dbName = null;
+    this.tableName = null;
+    this.partitionsList = new ArrayList<>();
+  }
+
+  public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
+    this.replState = other.replState;
+    this.dbName = other.dbName;
+    this.tableName = other.tableName;
+    this.partitionsList = other.getPartitions();
+  }
+
+  public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) {
+    this.replState = replState;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    if (partSpec != null) {
+      addPartition(partSpec);
+    }
+  }
+
+  public void addPartition(Map <String, String> partSpec) {
+    partitionsList.add(partSpec);
+  }
+
+  public String getReplicationState() {
+    return replState;
+  }
+
+  public String getDatabase() {
+    return dbName;
+  }
+
+  public String getTable() {
+    return tableName;
+  }
+
+  public List<Map <String, String>> getPartitions() {
+    return partitionsList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
index d6a95bf..5b26681 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -21,19 +21,15 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 abstract class AbstractMessageHandler implements MessageHandler {
   final HashSet<ReadEntity> readEntitySet = new HashSet<>();
   final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
-  final Map<String, Long> tablesUpdated = new HashMap<>(),
-      databasesUpdated = new HashMap<>();
+  final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker();
   final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
 
   @Override
@@ -47,13 +43,6 @@ abstract class AbstractMessageHandler implements MessageHandler {
   }
 
   @Override
-  public Map<String, Long> tablesUpdated() {
-    return tablesUpdated;
-  }
-
-  @Override
-  public Map<String, Long> databasesUpdated() {
-    return databasesUpdated;
-  }
+  public UpdatedMetaDataTracker getUpdatedMetadata() { return updatedMetadata; }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
index 39697bb..0873c1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
@@ -64,8 +64,7 @@ public class AddForeignKeyHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
     tasks.add(addConstraintsTask);
     context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(addConstraintsTask);    
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
index e2c1d1d..76cbe5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
@@ -65,8 +65,7 @@ public class AddNotNullConstraintHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
     tasks.add(addConstraintsTask);
     context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(addConstraintsTask);    
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
index 7babb6a..aee46da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
@@ -62,8 +62,7 @@ public class AddPrimaryKeyHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
     tasks.add(addConstraintsTask);
     context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(addConstraintsTask);    
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
index e7b404a..f0cb11e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
@@ -63,8 +63,7 @@ public class AddUniqueConstraintHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
     tasks.add(addConstraintsTask);
     context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(addConstraintsTask);    
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index a6d35cf..3f176aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -67,7 +67,8 @@ public class CreateFunctionHandler extends AbstractMessageHandler {
       // bootstrap.There should be a better way to do this but might required a lot of changes across
       // different handlers, unless this is a common pattern that is seen, leaving this here.
       if (context.dmd != null) {
-        databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo());
+        updatedMetadata.set(context.dmd.getEventTo().toString(), builder.destinationDbName,
+                            null, null);
       }
       readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf));
       if (builder.replCopyTasks.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
index 58aa214..459fac5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
@@ -44,8 +44,7 @@ public class DropConstraintHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
     tasks.add(dropConstraintsTask);
     context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(dropConstraintsTask);    
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
index dae300f..fee2bb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
@@ -43,7 +43,7 @@ public class DropFunctionHandler extends AbstractMessageHandler {
     context.log.debug(
         "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName()
     );
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null);
     return Collections.singletonList(dropFunctionTask);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 771400e..5456416 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -58,8 +58,7 @@ public class DropPartitionHandler extends AbstractMessageHandler {
         );
         context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
             dropPtnDesc.getTableName(), msg.getPartitions());
-        databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-        tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+        updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
         return Collections.singletonList(dropPtnTask);
       } else {
         throw new SemanticException(

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
index 3ee3949..d2f5248 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -46,7 +46,7 @@ public class DropTableHandler extends AbstractMessageHandler {
     context.log.debug(
         "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()
     );
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null);
     return Collections.singletonList(dropTableTask);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
index 40ed0b2..d412fd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -40,8 +40,7 @@ public class InsertHandler extends AbstractMessageHandler {
     List<Task<? extends Serializable>> tasks = tableHandler.handle(currentContext);
     readEntitySet.addAll(tableHandler.readEntities());
     writeEntitySet.addAll(tableHandler.writeEntities());
-    databasesUpdated.putAll(tableHandler.databasesUpdated);
-    tablesUpdated.putAll(tableHandler.tablesUpdated);
+    getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata());
     return tasks;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index 33c716f..8daff6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.slf4j.Logger;
 
 import java.io.Serializable;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -42,9 +42,7 @@ public interface MessageHandler {
 
   Set<WriteEntity> writeEntities();
 
-  Map<String, Long> tablesUpdated();
-
-  Map<String, Long> databasesUpdated();
+  UpdatedMetaDataTracker getUpdatedMetadata();
 
   class Context {
     final String dbName, tableName, location;

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
index 5bd0532..43f2cbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -65,8 +65,7 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
         new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
     context.log.debug("Added rename ptn task : {}:{}->{}",
                       renamePtnTask.getId(), oldPartSpec, newPartSpec);
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(tableName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
     return Collections.singletonList(renamePtnTask);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
index 4785e55..e30abad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -65,9 +65,8 @@ public class RenameTableHandler extends AbstractMessageHandler {
                         renameTableTask.getId(), oldName, newName);
 
       // oldDbName and newDbName *will* be the same if we're here
-      databasesUpdated.put(newDbName, context.dmd.getEventTo());
-      tablesUpdated.remove(oldName);
-      tablesUpdated.put(newName, context.dmd.getEventTo());
+      updatedMetadata.set(context.dmd.getEventTo().toString(), newDbName,
+              msg.getTableObjAfter().getTableName(), null);
 
       // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out
       // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
index 65e1d6a..2c5c2d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -47,8 +47,7 @@ public class TableHandler extends AbstractMessageHandler {
       // Also, REPL LOAD doesn't support external table and hence no location set as well.
       ImportSemanticAnalyzer.prepareImport(false, false, false,
           (context.precursor != null), null, context.tableName, context.dbName,
-          null, context.location, x,
-          databasesUpdated, tablesUpdated);
+          null, context.location, x, updatedMetadata);
 
       return importTasks;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
index 3a8990a..b983f95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
@@ -62,8 +62,7 @@ public class TruncatePartitionHandler extends AbstractMessageHandler {
             context.hiveConf);
     context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(),
         truncateTableDesc.getTableName());
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec);
     return Collections.singletonList(truncatePtnTask);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
index 93ffa29..c6d7739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
@@ -44,8 +44,7 @@ public class TruncateTableHandler extends AbstractMessageHandler {
 
     context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(),
         truncateTableDesc.getTableName());
-    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
-    tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
     return Collections.singletonList(truncateTableTask);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
index 7410e5a..ca6f090 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  * AlterDatabaseDesc.
  *
  */
-@Explain(displayName = "Create Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+@Explain(displayName = "Alter Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
 public class AlterDatabaseDesc extends DDLDesc implements Serializable {
 
   private static final long serialVersionUID = 1L;