You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/20 22:48:15 UTC

hive git commit: HIVE-15466: REPL LOAD & DUMP support for incremental DROP_TABLE/DROP_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)

Repository: hive
Updated Branches:
  refs/heads/master 5efd20c7f -> ab9b21920


HIVE-15466: REPL LOAD & DUMP support for incremental DROP_TABLE/DROP_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab9b2192
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab9b2192
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab9b2192

Branch: refs/heads/master
Commit: ab9b21920c0b624822850e32197e7d6575fd5fb4
Parents: 5efd20c
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Tue Dec 20 14:47:55 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Tue Dec 20 14:47:55 2016 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 116 +++++++
 .../messaging/json/JSONMessageFactory.java      |   6 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |   2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 347 +++++++++++++++++--
 4 files changed, 435 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 3ac5ba7..d2696be 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -21,11 +21,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.Shell;
+import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -41,6 +45,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class TestReplicationScenarios {
 
@@ -54,6 +60,7 @@ public class TestReplicationScenarios {
   static boolean useExternalMS = false;
   static int msPort;
   static Driver driver;
+  static HiveMetaStoreClient metaStoreClient;
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   private ArrayList<String> lastResults;
@@ -91,6 +98,7 @@ public class TestReplicationScenarios {
 
     driver = new Driver(hconf);
     SessionState.start(new CliSessionState(hconf));
+    metaStoreClient = new HiveMetaStoreClient(hconf);
   }
 
   @AfterClass
@@ -285,6 +293,114 @@ public class TestReplicationScenarios {
     verifyResults(ptn_data_1);
     run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
     verifyResults(ptn_data_2);
+
+  }
+
+  @Test
+  public void testDrops() throws IOException {
+
+    String testName = "drops";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    run("DROP TABLE " + dbName + ".unptned");
+    run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+    run("DROP TABLE " + dbName + ".ptned2");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned");
+    verifyResults(ptn_data_1);
+
+    advanceDumpDir();;
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String postDropReplDumpLocn = getResult(0,0);
+    String postDropReplDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+    Exception e = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+      assertNull(tbl);
+    } catch (TException te) {
+      e = te;
+    }
+    assertNotNull(e);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + "_dupe.ptned");
+    verifyResults(ptn_data_1);
+
+    Exception e2 = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+      assertNull(tbl);
+    } catch (TException te) {
+      e2 = te;
+    }
+    assertNotNull(e2);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
   }
 
   private String getResult(int rowNum, int colNum) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index c57f577..17e7686 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -221,7 +221,11 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   public static ObjectNode getJsonTree(NotificationEvent event) throws Exception {
-    JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
+    return getJsonTree(event.getMessage());
+  }
+
+  public static ObjectNode getJsonTree(String eventMessage) throws Exception {
+    JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage);
     ObjectMapper mapper = new ObjectMapper();
     return mapper.readValue(jsonParser, ObjectNode.class);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 050522f..0f472e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -3132,7 +3132,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
     return result;
   }
 
-  private static ExprNodeGenericFuncDesc makeBinaryPredicate(
+  public static ExprNodeGenericFuncDesc makeBinaryPredicate(
       String fn, ExprNodeDesc left, ExprNodeDesc right) throws SemanticException {
       return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
           FunctionRegistry.getFunctionInfo(fn).getGenericUDF(), Lists.newArrayList(left, right));

http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8725015..69ccda7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,13 +27,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventUtils;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -45,18 +49,28 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.IOUtils;
 
 import javax.annotation.Nullable;
+import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,6 +91,122 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
+  public static final String DUMPMETADATA = "_dumpmetadata";
+
+  public enum DUMPTYPE {
+    BOOTSTRAP("BOOTSTRAP"),
+    INCREMENTAL("INCREMENTAL"),
+    EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
+    EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
+    EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
+    EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
+    EVENT_UNKNOWN("EVENT_UNKNOWN");
+
+    String type = null;
+    DUMPTYPE(String type) {
+      this.type = type;
+    }
+
+    @Override
+    public String toString(){
+      return type;
+    }
+
+  };
+
+  public class DumpMetaData {
+    // wrapper class for reading and writing metadata about a dump
+    // responsible for _dumpmetadata files
+
+    private DUMPTYPE dumpType;
+    private Long eventFrom = null;
+    private Long eventTo = null;
+    private String payload = null;
+    private boolean initialized = false;
+
+    private final Path dumpRoot;
+    private final Path dumpFile;
+
+    public DumpMetaData(Path dumpRoot) {
+      this.dumpRoot = dumpRoot;
+      dumpFile = new Path(dumpRoot, DUMPMETADATA);
+    }
+
+    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){
+      this(dumpRoot);
+      setDump(lvl,eventFrom,eventTo);
+    }
+
+    public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){
+      this.dumpType = lvl;
+      this.eventFrom = eventFrom;
+      this.eventTo = eventTo;
+      this.initialized = true;
+    }
+
+    public void loadDumpFromFile() throws SemanticException {
+      try {
+        // read from dumpfile and instantiate self
+        FileSystem fs = dumpFile.getFileSystem(conf);
+        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+        String line = null;
+        if ( (line = br.readLine()) != null){
+          String[] lineContents = line.split("\t",4);
+          setDump(DUMPTYPE.valueOf(lineContents[0]),Long.valueOf(lineContents[1]),Long.valueOf(lineContents[2]));
+          setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]);
+        } else {
+          throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
+        }
+      } catch (IOException ioe){
+        throw new SemanticException(ioe);
+      }
+    }
+
+    public DUMPTYPE getDumpType() throws SemanticException {
+      initializeIfNot();
+      return this.dumpType;
+    }
+
+    public String getPayload() throws SemanticException {
+      initializeIfNot();
+      return this.payload;
+    }
+
+    public void setPayload(String payload) {
+      this.payload = payload;
+    }
+
+    public Long getEventFrom() throws SemanticException {
+      initializeIfNot();
+      return eventFrom;
+    }
+
+    public Long getEventTo() throws SemanticException {
+      initializeIfNot();
+      return eventTo;
+    }
+
+    public Path getDumpFilePath() {
+      return dumpFile;
+    }
+
+    public boolean isIncrementalDump() throws SemanticException {
+      initializeIfNot();
+      return (this.dumpType == DUMPTYPE.INCREMENTAL);
+    }
+
+    private void initializeIfNot() throws SemanticException {
+      if (!initialized){
+        loadDumpFromFile();
+      }
+    }
+
+    public void write() throws SemanticException {
+      writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile);
+    }
+
+  }
+
   public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
@@ -155,13 +285,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, getNextDumpDir());
-    Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
-    String lastReplId;
+    DumpMetaData dmd = new DumpMetaData(dumpRoot);
+    Long lastReplId;
     try {
       if (eventFrom == null){
         // bootstrap case
-        String bootDumpBeginReplId =
-            String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+        Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
         for (String dbName : matchesDb(dbNameOrPattern)) {
           LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
           Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
@@ -171,8 +300,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             dumpTbl(ast, dbName, tblName, dbRoot);
           }
         }
-        String bootDumpEndReplId =
-            String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+        Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
         LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId);
 
         // Now that bootstrap has dumped all objects related, we have to account for the changes
@@ -184,8 +312,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         EventUtils.MSClientNotificationFetcher evFetcher =
             new EventUtils.MSClientNotificationFetcher(db.getMSC());
         EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
-            evFetcher, Long.valueOf(bootDumpBeginReplId),
-            Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1),
+            evFetcher, bootDumpBeginReplId,
+            Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
             evFilter );
 
         // Now we consolidate all the events that happenned during the objdump into the objdump
@@ -194,7 +322,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
           // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
         }
-        LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
+        LOG.info("Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+        dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId);
+        dmd.write();
+
         // Set the correct last repl id to return to the user
         lastReplId = bootDumpEndReplId;
       } else {
@@ -230,13 +361,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           dumpEvent(ev,evRoot);
         }
 
-        LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
-        List<String> vals;
-        writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata);
+        LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo);
+        writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath());
+        dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo);
+        dmd.write();
         // Set the correct last repl id to return to the user
-        lastReplId = String.valueOf(eventTo);
+        lastReplId = eventTo;
       }
-      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), lastReplId), dumpSchema);
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
       setFetchTask(createFetchTask(dumpSchema));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
@@ -248,7 +380,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
     long evid = ev.getEventId();
     String evidStr = String.valueOf(evid);
-    ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr);
+    ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr);
     MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
     switch (ev.getEventType()){
       case MessageFactory.CREATE_TABLE_EVENT : {
@@ -278,7 +410,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         // we will, however, do so here, now, for dev/debug's sake.
         Path dataPath = new Path(evRoot,"data");
         rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf));
-
+        (new DumpMetaData(evRoot,DUMPTYPE.EVENT_CREATE_TABLE,evid,evid)).write();
         break;
       }
       case MessageFactory.ADD_PARTITION_EVENT : {
@@ -334,11 +466,29 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
               replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf));
         }
 
+        (new DumpMetaData(evRoot,DUMPTYPE.EVENT_ADD_PARTITION,evid,evid)).write();
+        break;
+      }
+      case MessageFactory.DROP_TABLE_EVENT : {
+        LOG.info("Processing#{} DROP_TABLE message : {}",ev.getEventId(),ev.getMessage());
+        DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_TABLE,evid,evid);
+        dmd.setPayload(ev.getMessage());
+        dmd.write();
+        break;
+      }
+      case MessageFactory.DROP_PARTITION_EVENT : {
+        LOG.info("Processing#{} DROP_PARTITION message : {}",ev.getEventId(),ev.getMessage());
+        DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_PARTITION,evid,evid);
+        dmd.setPayload(ev.getMessage());
+        dmd.write();
         break;
       }
+      // TODO : handle other event types
       default:
-        LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage());
-        // TODO : handle other event types
+        LOG.info("Dummy processing#{} message : {}",ev.getEventId(), ev.getMessage());
+        DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_UNKNOWN,evid,evid);
+        dmd.setPayload(ev.getMessage());
+        dmd.write();
         break;
     }
 
@@ -426,14 +576,44 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   /*
    * Example dump dirs we need to be able to handle :
    *
-   * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+   * for: hive.repl.rootdir = staging/
+   * Then, repl dumps will be created in staging/<dumpdir>
    *
-   * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
-   * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+   * single-db-dump: staging/blah12345 will contain a db dir for the db specified
+   *  blah12345/
+   *   default/
+   *    _metadata
+   *    tbl1/
+   *      _metadata
+   *      dt=20160907/
+   *        _files
+   *    tbl2/
+   *    tbl3/
+   *    unptn_tbl/
+   *      _metadata
+   *      _files
    *
-   * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+   * multi-db-dump: staging/bar12347 will contain dirs for each db covered
+   * staging/
+   *  bar12347/
+   *   default/
+   *     ...
+   *   sales/
+   *     ...
    *
-   * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+   * single table-dump: staging/baz123 will contain a table object dump inside
+   * staging/
+   *  baz123/
+   *    _metadata
+   *    dt=20150931/
+   *      _files
+   *
+   * incremental dump : staging/blue123 will contain dirs for each event inside.
+   * staging/
+   *  blue123/
+   *    34/
+   *    35/
+   *    36/
    */
   private void analyzeReplLoad(ASTNode ast) throws SemanticException {
     LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
@@ -458,22 +638,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // db name, and with a _metadata file in each, and table dirs inside that.
       // b) It can be a table dump dir, in which case we expect a _metadata dump of
       // a table in question in the dir, and individual ptn dir hierarchy.
-      // c) A dump can be an event-level dump, which means we have several subdirs
+      // c) A dump can be an incremental dump, which means we have several subdirs
       // each of which have the evid as the dir name, and each of which correspond
       // to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are
       // handled, so all of these dumps will be at a table/ptn level.
 
-      // For incremental repl, eventually, we can have individual events which can
+      // For incremental repl, we will have individual events which can
       // be other things like roles and fns as well.
+      // At this point, all dump dirs should contain a _dumpmetadata file that
+      // tells us what is inside that dumpdir.
+
+      DumpMetaData dmd = new DumpMetaData(loadPath);
 
       boolean evDump = false;
-      Path dumpMetadata = new Path(loadPath, "_dumpmetadata");
-      // TODO : only event dumps currently have _dumpmetadata - this might change. Generify.
-      if (fs.exists(dumpMetadata)){
-        LOG.debug("{} exists, this is a event dump", dumpMetadata);
+      if (dmd.isIncrementalDump()){
+        LOG.debug("{} contains an incremental dump", loadPath);
         evDump = true;
       } else {
-        LOG.debug("{} does not exist, this is an object dump", dumpMetadata);
+        LOG.debug("{} contains an bootstrap dump", loadPath);
       }
 
       if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
@@ -533,6 +715,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           //                \               /
           //                 --->ev1.task3--
           //
+          // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
+          // entire chain
+
           List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
               dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail);
           LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
@@ -562,10 +747,98 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private List<Task<? extends Serializable>> analyzeEventLoad(
       String dbName, String tblName, String locn,
-      Task<? extends  Serializable> precursor ) throws SemanticException {
+      Task<? extends Serializable> precursor) throws SemanticException {
     // Currently handles only create-tbl & insert-ptn, since only those are dumped
     // As we add more event types, this will expand.
-    return analyzeTableLoad(dbName, tblName, locn, precursor);
+    DumpMetaData dmd = new DumpMetaData(new Path(locn));
+    switch (dmd.getDumpType()) {
+      case EVENT_CREATE_TABLE: {
+        return analyzeTableLoad(dbName, tblName, locn, precursor);
+      }
+      case EVENT_ADD_PARTITION: {
+        return analyzeTableLoad(dbName, tblName, locn, precursor);
+      }
+      case EVENT_DROP_TABLE: {
+        MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+        DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload());
+        DropTableDesc dropTableDesc = new DropTableDesc(
+            dbName + "." + (tblName == null ? dropTableMessage.getTable() : tblName),
+            null, true, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+        Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf);
+        if (precursor != null){
+          precursor.addDependentTask(dropTableTask);
+        }
+        List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
+        tasks.add(dropTableTask);
+        LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+        return tasks;
+      }
+      case EVENT_DROP_PARTITION: {
+        MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+        DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload());
+        Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = genPartSpecs(dropPartitionMessage.getPartitions());
+        if (partSpecs.size() > 0){
+          DropTableDesc dropPtnDesc = new DropTableDesc(
+              dbName + "." + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs,
+              null, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+          Task<DDLWork> dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf);
+          if (precursor != null){
+            precursor.addDependentTask(dropPtnTask);
+          }
+          List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
+          tasks.add(dropPtnTask);
+          LOG.debug("Added drop ptn task : {}:{},{}",
+              dropPtnTask.getId(), dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions());
+          return tasks;
+        } else {
+          throw new SemanticException("DROP PARTITION EVENT does not return any part descs for event message :"+dmd.getPayload());
+        }
+      }
+      case EVENT_UNKNOWN: {
+        break;
+      }
+      default: {
+        break;
+      }
+    }
+    return null;
+  }
+
+  private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(List<Map<String, String>> partitions) throws SemanticException {
+    Map<Integer,List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<Integer,List<ExprNodeGenericFuncDesc>>();
+
+    int partPrefixLength = 0;
+    if ((partitions != null) && (partitions.size() > 0)) {
+      partPrefixLength = partitions.get(0).size();
+      // pick the length of the first ptn, we expect all ptns listed to have the same number of key-vals.
+    }
+    List<ExprNodeGenericFuncDesc> ptnDescs = new ArrayList<ExprNodeGenericFuncDesc>();
+    for (Map<String,String> ptn : partitions) {
+      // convert each key-value-map to appropriate expression.
+
+      ExprNodeGenericFuncDesc expr = null;
+      for (Map.Entry<String,String> kvp : ptn.entrySet()) {
+        String key = kvp.getKey();
+        Object val = kvp.getValue();
+        // FIXME : bug here, value is being placed as a String, but should actually be the underlying type
+        // as converted to it by looking at the table's col schema. To do that, however, we need the
+        // tableObjJson from the DropTableMessage. So, currently, this will only work for partitions for
+        // which the partition keys are all strings. So, for now, we hardcode it, but we need to fix this.
+        String type = "string";
+
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate("=", column, new ExprNodeConstantDesc(pti, val));
+        expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+      }
+      if (expr != null) {
+        ptnDescs.add(expr);
+      }
+    }
+    if (ptnDescs.size() > 0){
+      partSpecs.put(partPrefixLength,ptnDescs);
+    }
+    return partSpecs;
   }
 
   private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
@@ -709,8 +982,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
-    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" ,
-        String.valueOf(replLastId),ctx.getResFile());
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
+        String.valueOf(replLastId), ctx.getResFile());
   }
 
   private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
@@ -752,10 +1025,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
+  // Use for specifying object state as well as event state
   private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
     return new ReplicationSpec(true, false, evState, objState, false, true);
   }
 
+  // Use for replication states focussed on event only, where the obj state will be the event state
+  private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws SemanticException {
+    return getNewReplicationSpec(evState,evState);
+  }
+
   private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
       throws HiveException {
     if (tblPattern == null) {