You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/05/30 06:37:12 UTC

hive git commit: HIVE-16727 : REPL DUMP for insert event should't fail if the table is already dropped. (Sankar Hariappan via Thejas Nair

Repository: hive
Updated Branches:
  refs/heads/master bbf0629a5 -> 8dcc78a21


HIVE-16727 : REPL DUMP for insert event should't fail if the table is already dropped. (Sankar Hariappan via Thejas Nair


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8dcc78a2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8dcc78a2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8dcc78a2

Branch: refs/heads/master
Commit: 8dcc78a21488a5fe2ec9e42084a61bf38653ddd6
Parents: bbf0629
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Mon May 29 23:37:07 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Mon May 29 23:37:07 2017 -0700

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  11 +-
 .../listener/TestDbNotificationListener.java    |  28 +---
 .../hive/ql/parse/TestReplicationScenarios.java | 130 +++++++++++++++++++
 .../hive/metastore/events/InsertEvent.java      |  44 +++----
 .../hive/metastore/messaging/InsertMessage.java |  24 ++--
 .../metastore/messaging/MessageFactory.java     |  10 +-
 .../messaging/json/JSONInsertMessage.java       |  51 +++++---
 .../messaging/json/JSONMessageFactory.java      |  10 +-
 .../parse/repl/dump/events/InsertHandler.java   |  28 ++--
 9 files changed, 231 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6f96e1d..e598a6b 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -437,13 +437,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
   }
   @Override
   public void onInsert(InsertEvent insertEvent) throws MetaException {
+    Table tableObj = insertEvent.getTableObj();
     NotificationEvent event =
-        new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
-            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
+        new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj,
+                insertEvent.getPartitionObj(), insertEvent.isReplace(),
             new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
-            .toString());
-    event.setDbName(insertEvent.getDb());
-    event.setTableName(insertEvent.getTable());
+                .toString());
+    event.setDbName(tableObj.getDbName());
+    event.setTableName(tableObj.getTableName());
     process(event, insertEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 2168a67..808c9c7 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -1227,8 +1227,9 @@ public class TestDbNotificationListener {
     FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
     List<FieldSchema> partCols = new ArrayList<FieldSchema>();
     List<String> partCol1Vals = Arrays.asList("today");
-    LinkedHashMap<String, String> partKeyVals = new LinkedHashMap<String, String>();
-    partKeyVals.put("ds", "today");
+    List<String> partKeyVals = new ArrayList<String>();
+    partKeyVals.add("today");
+
     partCols.add(partCol1);
     Table table =
         new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
@@ -1264,9 +1265,9 @@ public class TestDbNotificationListener {
     // Parse the message field
     verifyInsert(event, defaultDbName, tblName);
     InsertMessage insertMessage = md.getInsertMessage(event.getMessage());
-    Map<String,String> partKeyValsFromNotif = insertMessage.getPartitionKeyValues();
+    List<String> ptnValues = insertMessage.getPtnObj().getValues();
 
-    assertMapEquals(partKeyVals, partKeyValsFromNotif);
+    assertEquals(partKeyVals, ptnValues);
 
     // Verify the eventID was passed to the non-transactional listener
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
@@ -1528,31 +1529,16 @@ public class TestDbNotificationListener {
     InsertMessage insertMsg = md.getInsertMessage(event.getMessage());
     System.out.println("InsertMessage: " + insertMsg.toString());
     if (dbName != null ){
-      assertEquals(dbName, insertMsg.getDB());
+      assertEquals(dbName, insertMsg.getTableObj().getDbName());
     }
     if (tblName != null){
-      assertEquals(tblName, insertMsg.getTable());
+      assertEquals(tblName, insertMsg.getTableObj().getTableName());
     }
     // Should have files
     Iterator<String> files = insertMsg.getFiles().iterator();
     assertTrue(files.hasNext());
   }
 
-
-  private void assertMapEquals(Map<String, String> map1, Map<String, String> map2) {
-    // non ordered, non-classed map comparison - use sparingly instead of assertEquals
-    // only if you're sure that the order does not matter.
-    if ((map1 == null) || (map2 == null)){
-      assertNull(map1);
-      assertNull(map2);
-    }
-    assertEquals(map1.size(),map2.size());
-    for (String k : map1.keySet()){
-      assertTrue(map2.containsKey(k));
-      assertEquals(map1.get(k), map2.get(k));
-    }
-  }
-
   @Test
   public void cleanupNotifs() throws Exception {
     Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters);

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 21f09ae..766d858 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1282,6 +1282,136 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testIncrementalInsertDropUnpartitionedTable() throws IOException {
+    String testName = "incrementalInsertDropUnpartitionedTable";
+    String dbName = createDB(testName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".unptned");
+    verifySetup("SELECT a from " + dbName + ".unptned_tmp ORDER BY a", unptn_data);
+
+    // Get the last repl ID corresponding to all insert/alter/create events except DROP.
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String lastDumpIdWithoutDrop = getResult(0, 1);
+
+    // Drop all the tables
+    run("DROP TABLE " + dbName + ".unptned");
+    run("DROP TABLE " + dbName + ".unptned_tmp");
+    verifyFail("SELECT * FROM " + dbName + ".unptned");
+    verifyFail("SELECT * FROM " + dbName + ".unptned_tmp");
+
+    // Dump all the events except DROP
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    // Need to find the tables and data as drop is not part of this dump
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_tmp ORDER BY a", unptn_data);
+
+    // Dump the drop events and check if tables are getting dropped in target as well
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyFail("SELECT * FROM " + dbName + ".unptned");
+    verifyFail("SELECT * FROM " + dbName + ".unptned_tmp");
+  }
+
+  @Test
+  public void testIncrementalInsertDropPartitionedTable() throws IOException {
+    String testName = "incrementalInsertDropPartitionedTable";
+    String dbName = createDB(testName);
+    run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=20)");
+    run("ALTER TABLE " + dbName + ".ptned RENAME PARTITION (b=20) TO PARTITION (b=2");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+    run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned");
+    verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2);
+
+    // Get the last repl ID corresponding to all insert/alter/create events except DROP.
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String lastDumpIdWithoutDrop = getResult(0, 1);
+
+    // Drop all the tables
+    run("DROP TABLE " + dbName + ".ptned_tmp");
+    run("DROP TABLE " + dbName + ".ptned");
+    verifyFail("SELECT * FROM " + dbName + ".ptned_tmp");
+    verifyFail("SELECT * FROM " + dbName + ".ptned");
+
+    // Dump all the events except DROP
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    // Need to find the tables and data as drop is not part of this dump
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=2) ORDER BY a", ptn_data_2);
+
+    // Dump the drop events and check if tables are getting dropped in target as well
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyFail("SELECT * FROM " + dbName + ".ptned_tmp");
+    verifyFail("SELECT * FROM " + dbName + ".ptned");
+  }
+
+  @Test
   public void testViewsReplication() throws IOException {
     String testName = "viewsReplication";
     String dbName = createDB(testName);

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index dff1195..c33ade1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -24,20 +24,16 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 public class InsertEvent extends ListenerEvent {
 
-  // Note that this event is fired from the client, so rather than having full metastore objects
-  // we have just the string names, but that's fine for what we need.
-  private final String db;
-  private final String table;
-  private final Map<String, String> keyValues;
+  private final Table tableObj;
+  private final Partition ptnObj;
   private final boolean replace;
   private final List<String> files;
   private List<String> fileChecksums = new ArrayList<String>();
@@ -55,42 +51,36 @@ public class InsertEvent extends ListenerEvent {
       InsertEventRequestData insertData, boolean status, HMSHandler handler) throws MetaException,
       NoSuchObjectException {
     super(status, handler);
-    this.db = db;
-    this.table = table;
 
-    // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
-    this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
-    this.files = insertData.getFilesAdded();
     GetTableRequest req = new GetTableRequest(db, table);
     req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
-    Table t = handler.get_table_req(req).getTable();
-    keyValues = new LinkedHashMap<String, String>();
+    this.tableObj = handler.get_table_req(req).getTable();
     if (partVals != null) {
-      for (int i = 0; i < partVals.size(); i++) {
-        keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i));
-      }
+      this.ptnObj = handler.get_partition(db, table, partVals);
+    } else {
+      this.ptnObj = null;
     }
+
+    // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+    this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
+    this.files = insertData.getFilesAdded();
     if (insertData.isSetFilesAddedChecksum()) {
       fileChecksums = insertData.getFilesAddedChecksum();
     }
   }
 
-  public String getDb() {
-    return db;
-  }
-
   /**
-   * @return The table.
+   * @return Table object
    */
-  public String getTable() {
-    return table;
+  public Table getTableObj() {
+    return tableObj;
   }
 
   /**
-   * @return List of values for the partition keys.
+   * @return Partition object
    */
-  public Map<String, String> getPartitionKeyValues() {
-    return keyValues;
+  public Partition getPartitionObj() {
+    return ptnObj;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 6d146e0..6505c67 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hive.metastore.messaging;
 
-import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 
 /**
  * HCat message sent when an insert is done to a table or partition.
@@ -43,19 +44,26 @@ public abstract class InsertMessage extends EventMessage {
   public abstract boolean isReplace();
 
   /**
-   * Get the map of partition keyvalues.  Will be null if this insert is to a table and not a
-   * partition.
-   * @return Map of partition keyvalues, or null.
-   */
-  public abstract Map<String,String> getPartitionKeyValues();
-
-  /**
    * Get list of file name and checksum created as a result of this DML operation
    *
    * @return The iterable of files
    */
   public abstract Iterable<String> getFiles();
 
+  /**
+   * Get the table object associated with the insert
+   *
+   * @return The Json format of Table object
+   */
+  public abstract Table getTableObj() throws Exception;
+
+  /**
+   * Get the partition object associated with the insert
+   *
+   * @return The Json format of Partition object if the table is partitioned else return null.
+   */
+  public abstract Partition getPtnObj() throws Exception;
+
   @Override
   public EventMessage checkValid() {
     if (getTable() == null)

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 1bd52a8..9437e8b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Abstract Factory for the construction of HCatalog message instances.
@@ -229,14 +228,13 @@ public abstract class MessageFactory {
   /**
    * Factory method for building insert message
    *
-   * @param db Name of the database the insert occurred in
-   * @param table Name of the table the insert occurred in
-   * @param partVals Partition values for the partition that the insert occurred in, may be null if
+   * @param tableObj Table object where the insert occurred in
+   * @param ptnObj Partition object where the insert occurred in, may be null if
    *          the insert was done into a non-partitioned table
    * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
    * @param files Iterator of file created
    * @return instance of InsertMessage
    */
-  public abstract InsertMessage buildInsertMessage(String db, String table,
-      Map<String, String> partVals, boolean replace, Iterator<String> files);
+  public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj,
+                                                   boolean replace, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index c059d47..18a15f5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -19,14 +19,16 @@
 
 package org.apache.hadoop.hive.metastore.messaging.json;
 
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.thrift.TException;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 import com.google.common.collect.Lists;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
  * JSON implementation of InsertMessage
@@ -34,7 +36,7 @@ import java.util.Map;
 public class JSONInsertMessage extends InsertMessage {
 
   @JsonProperty
-  String server, servicePrincipal, db, table;
+  String server, servicePrincipal, db, table, tableObjJson, ptnObjJson;
 
   @JsonProperty
   Long timestamp;
@@ -45,25 +47,39 @@ public class JSONInsertMessage extends InsertMessage {
   @JsonProperty
   List<String> files;
 
-  @JsonProperty
-  Map<String, String> partKeyVals;
-
   /**
    * Default constructor, needed for Jackson.
    */
   public JSONInsertMessage() {
   }
 
-  public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
-      Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
+  public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, Partition ptnObj,
+                           boolean replace, Iterator<String> fileIter, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
-    this.db = db;
-    this.table = table;
+
+    if (null == tableObj) {
+      throw new IllegalArgumentException("Table not valid.");
+    }
+
+    this.db = tableObj.getDbName();
+    this.table = tableObj.getTableName();
+
+    try {
+      this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+      if (null != ptnObj) {
+        this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj);
+      } else {
+        this.ptnObjJson = null;
+      }
+    } catch (TException e) {
+      throw new IllegalArgumentException("Could not serialize: ", e);
+    }
+
     this.timestamp = timestamp;
     this.replace = Boolean.toString(replace);
-    this.partKeyVals = partKeyVals;
     this.files = Lists.newArrayList(fileIter);
+
     checkValid();
   }
 
@@ -78,11 +94,6 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   @Override
-  public Map<String, String> getPartitionKeyValues() {
-    return partKeyVals;
-  }
-
-  @Override
   public Iterable<String> getFiles() {
     return files;
   }
@@ -106,6 +117,16 @@ public class JSONInsertMessage extends InsertMessage {
   public boolean isReplace() { return Boolean.parseBoolean(replace); }
 
   @Override
+  public Table getTableObj() throws Exception {
+    return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+  }
+
+  @Override
+  public Partition getPtnObj() throws Exception {
+    return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class));
+  }
+
+  @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 04a4041..a4c31f2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -28,10 +28,6 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.Index;
@@ -165,9 +161,9 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   @Override
-  public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
-      Iterator<String> fileIter) {
-    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
+  public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
+                                          boolean replace, Iterator<String> fileIter) {
+    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, tableObj, partObj, replace, fileIter, now());
   }
 
   private long now() {

http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index f514fb2..956bb08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -23,18 +23,15 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.thrift.TException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 class InsertHandler extends AbstractEventHandler {
 
@@ -45,11 +42,10 @@ class InsertHandler extends AbstractEventHandler {
   @Override
   public void handle(Context withinContext) throws Exception {
     InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
-    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
-    Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg);
     List<Partition> qlPtns = null;
-    if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
-      qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
+    if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
+      qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));
     }
     Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
 
@@ -88,13 +84,13 @@ class InsertHandler extends AbstractEventHandler {
     dmd.write();
   }
 
-  private org.apache.hadoop.hive.ql.metadata.Table tableObject(
-      Context withinContext, InsertMessage insertMsg) throws TException {
-    return new org.apache.hadoop.hive.ql.metadata.Table(
-        withinContext.db.getMSC().getTable(
-            insertMsg.getDB(), insertMsg.getTable()
-        )
-    );
+  private org.apache.hadoop.hive.ql.metadata.Table tableObject(InsertMessage insertMsg) throws Exception {
+    return new org.apache.hadoop.hive.ql.metadata.Table(insertMsg.getTableObj());
+  }
+
+  private org.apache.hadoop.hive.ql.metadata.Partition partitionObject(
+          org.apache.hadoop.hive.ql.metadata.Table qlMdTable, InsertMessage insertMsg) throws Exception {
+    return new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, insertMsg.getPtnObj());
   }
 
   private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {