You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/30 21:17:19 UTC
[10/12] hive git commit: HIVE-16727 : REPL DUMP for insert event
should't fail if the table is already dropped. (Sankar Hariappan via Thejas
Nair
HIVE-16727 : REPL DUMP for insert event should't fail if the table is already dropped. (Sankar Hariappan via Thejas Nair
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8dcc78a2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8dcc78a2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8dcc78a2
Branch: refs/heads/hive-14535
Commit: 8dcc78a21488a5fe2ec9e42084a61bf38653ddd6
Parents: bbf0629
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Mon May 29 23:37:07 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Mon May 29 23:37:07 2017 -0700
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 11 +-
.../listener/TestDbNotificationListener.java | 28 +---
.../hive/ql/parse/TestReplicationScenarios.java | 130 +++++++++++++++++++
.../hive/metastore/events/InsertEvent.java | 44 +++----
.../hive/metastore/messaging/InsertMessage.java | 24 ++--
.../metastore/messaging/MessageFactory.java | 10 +-
.../messaging/json/JSONInsertMessage.java | 51 +++++---
.../messaging/json/JSONMessageFactory.java | 10 +-
.../parse/repl/dump/events/InsertHandler.java | 28 ++--
9 files changed, 231 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6f96e1d..e598a6b 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -437,13 +437,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
}
@Override
public void onInsert(InsertEvent insertEvent) throws MetaException {
+ Table tableObj = insertEvent.getTableObj();
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
- insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
+ new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj,
+ insertEvent.getPartitionObj(), insertEvent.isReplace(),
new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
- .toString());
- event.setDbName(insertEvent.getDb());
- event.setTableName(insertEvent.getTable());
+ .toString());
+ event.setDbName(tableObj.getDbName());
+ event.setTableName(tableObj.getTableName());
process(event, insertEvent);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 2168a67..808c9c7 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -1227,8 +1227,9 @@ public class TestDbNotificationListener {
FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
List<FieldSchema> partCols = new ArrayList<FieldSchema>();
List<String> partCol1Vals = Arrays.asList("today");
- LinkedHashMap<String, String> partKeyVals = new LinkedHashMap<String, String>();
- partKeyVals.put("ds", "today");
+ List<String> partKeyVals = new ArrayList<String>();
+ partKeyVals.add("today");
+
partCols.add(partCol1);
Table table =
new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
@@ -1264,9 +1265,9 @@ public class TestDbNotificationListener {
// Parse the message field
verifyInsert(event, defaultDbName, tblName);
InsertMessage insertMessage = md.getInsertMessage(event.getMessage());
- Map<String,String> partKeyValsFromNotif = insertMessage.getPartitionKeyValues();
+ List<String> ptnValues = insertMessage.getPtnObj().getValues();
- assertMapEquals(partKeyVals, partKeyValsFromNotif);
+ assertEquals(partKeyVals, ptnValues);
// Verify the eventID was passed to the non-transactional listener
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
@@ -1528,31 +1529,16 @@ public class TestDbNotificationListener {
InsertMessage insertMsg = md.getInsertMessage(event.getMessage());
System.out.println("InsertMessage: " + insertMsg.toString());
if (dbName != null ){
- assertEquals(dbName, insertMsg.getDB());
+ assertEquals(dbName, insertMsg.getTableObj().getDbName());
}
if (tblName != null){
- assertEquals(tblName, insertMsg.getTable());
+ assertEquals(tblName, insertMsg.getTableObj().getTableName());
}
// Should have files
Iterator<String> files = insertMsg.getFiles().iterator();
assertTrue(files.hasNext());
}
-
- private void assertMapEquals(Map<String, String> map1, Map<String, String> map2) {
- // non ordered, non-classed map comparison - use sparingly instead of assertEquals
- // only if you're sure that the order does not matter.
- if ((map1 == null) || (map2 == null)){
- assertNull(map1);
- assertNull(map2);
- }
- assertEquals(map1.size(),map2.size());
- for (String k : map1.keySet()){
- assertTrue(map2.containsKey(k));
- assertEquals(map1.get(k), map2.get(k));
- }
- }
-
@Test
public void cleanupNotifs() throws Exception {
Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters);
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 21f09ae..766d858 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1282,6 +1282,136 @@ public class TestReplicationScenarios {
}
@Test
+ public void testIncrementalInsertDropUnpartitionedTable() throws IOException {
+ String testName = "incrementalInsertDropUnpartitionedTable";
+ String dbName = createDB(testName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".unptned");
+ verifySetup("SELECT a from " + dbName + ".unptned_tmp ORDER BY a", unptn_data);
+
+ // Get the last repl ID corresponding to all insert/alter/create events except DROP.
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String lastDumpIdWithoutDrop = getResult(0, 1);
+
+ // Drop all the tables
+ run("DROP TABLE " + dbName + ".unptned");
+ run("DROP TABLE " + dbName + ".unptned_tmp");
+ verifyFail("SELECT * FROM " + dbName + ".unptned");
+ verifyFail("SELECT * FROM " + dbName + ".unptned_tmp");
+
+ // Dump all the events except DROP
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ // Need to find the tables and data as drop is not part of this dump
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_tmp ORDER BY a", unptn_data);
+
+ // Dump the drop events and check if tables are getting dropped in target as well
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyFail("SELECT * FROM " + dbName + ".unptned");
+ verifyFail("SELECT * FROM " + dbName + ".unptned_tmp");
+ }
+
+ @Test
+ public void testIncrementalInsertDropPartitionedTable() throws IOException {
+ String testName = "incrementalInsertDropPartitionedTable";
+ String dbName = createDB(testName);
+ run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=20)");
+ run("ALTER TABLE " + dbName + ".ptned RENAME PARTITION (b=20) TO PARTITION (b=2");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+ run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned");
+ verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2);
+
+ // Get the last repl ID corresponding to all insert/alter/create events except DROP.
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String lastDumpIdWithoutDrop = getResult(0, 1);
+
+ // Drop all the tables
+ run("DROP TABLE " + dbName + ".ptned_tmp");
+ run("DROP TABLE " + dbName + ".ptned");
+ verifyFail("SELECT * FROM " + dbName + ".ptned_tmp");
+ verifyFail("SELECT * FROM " + dbName + ".ptned");
+
+ // Dump all the events except DROP
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ // Need to find the tables and data as drop is not part of this dump
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=2) ORDER BY a", ptn_data_2);
+
+ // Dump the drop events and check if tables are getting dropped in target as well
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyFail("SELECT * FROM " + dbName + ".ptned_tmp");
+ verifyFail("SELECT * FROM " + dbName + ".ptned");
+ }
+
+ @Test
public void testViewsReplication() throws IOException {
String testName = "viewsReplication";
String dbName = createDB(testName);
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index dff1195..c33ade1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -24,20 +24,16 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
public class InsertEvent extends ListenerEvent {
- // Note that this event is fired from the client, so rather than having full metastore objects
- // we have just the string names, but that's fine for what we need.
- private final String db;
- private final String table;
- private final Map<String, String> keyValues;
+ private final Table tableObj;
+ private final Partition ptnObj;
private final boolean replace;
private final List<String> files;
private List<String> fileChecksums = new ArrayList<String>();
@@ -55,42 +51,36 @@ public class InsertEvent extends ListenerEvent {
InsertEventRequestData insertData, boolean status, HMSHandler handler) throws MetaException,
NoSuchObjectException {
super(status, handler);
- this.db = db;
- this.table = table;
- // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
- this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
- this.files = insertData.getFilesAdded();
GetTableRequest req = new GetTableRequest(db, table);
req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
- Table t = handler.get_table_req(req).getTable();
- keyValues = new LinkedHashMap<String, String>();
+ this.tableObj = handler.get_table_req(req).getTable();
if (partVals != null) {
- for (int i = 0; i < partVals.size(); i++) {
- keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i));
- }
+ this.ptnObj = handler.get_partition(db, table, partVals);
+ } else {
+ this.ptnObj = null;
}
+
+ // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+ this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
+ this.files = insertData.getFilesAdded();
if (insertData.isSetFilesAddedChecksum()) {
fileChecksums = insertData.getFilesAddedChecksum();
}
}
- public String getDb() {
- return db;
- }
-
/**
- * @return The table.
+ * @return Table object
*/
- public String getTable() {
- return table;
+ public Table getTableObj() {
+ return tableObj;
}
/**
- * @return List of values for the partition keys.
+ * @return Partition object
*/
- public Map<String, String> getPartitionKeyValues() {
- return keyValues;
+ public Partition getPartitionObj() {
+ return ptnObj;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 6d146e0..6505c67 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -19,7 +19,8 @@
package org.apache.hadoop.hive.metastore.messaging;
-import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
/**
* HCat message sent when an insert is done to a table or partition.
@@ -43,19 +44,26 @@ public abstract class InsertMessage extends EventMessage {
public abstract boolean isReplace();
/**
- * Get the map of partition keyvalues. Will be null if this insert is to a table and not a
- * partition.
- * @return Map of partition keyvalues, or null.
- */
- public abstract Map<String,String> getPartitionKeyValues();
-
- /**
* Get list of file name and checksum created as a result of this DML operation
*
* @return The iterable of files
*/
public abstract Iterable<String> getFiles();
+ /**
+ * Get the table object associated with the insert
+ *
+ * @return The Json format of Table object
+ */
+ public abstract Table getTableObj() throws Exception;
+
+ /**
+ * Get the partition object associated with the insert
+ *
+ * @return The Json format of Partition object if the table is partitioned else return null.
+ */
+ public abstract Partition getPtnObj() throws Exception;
+
@Override
public EventMessage checkValid() {
if (getTable() == null)
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 1bd52a8..9437e8b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.Iterator;
-import java.util.Map;
/**
* Abstract Factory for the construction of HCatalog message instances.
@@ -229,14 +228,13 @@ public abstract class MessageFactory {
/**
* Factory method for building insert message
*
- * @param db Name of the database the insert occurred in
- * @param table Name of the table the insert occurred in
- * @param partVals Partition values for the partition that the insert occurred in, may be null if
+ * @param tableObj Table object where the insert occurred in
+ * @param ptnObj Partition object where the insert occurred in, may be null if
* the insert was done into a non-partitioned table
* @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
* @param files Iterator of file created
* @return instance of InsertMessage
*/
- public abstract InsertMessage buildInsertMessage(String db, String table,
- Map<String, String> partVals, boolean replace, Iterator<String> files);
+ public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj,
+ boolean replace, Iterator<String> files);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index c059d47..18a15f5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -19,14 +19,16 @@
package org.apache.hadoop.hive.metastore.messaging.json;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.thrift.TException;
import org.codehaus.jackson.annotate.JsonProperty;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
/**
* JSON implementation of InsertMessage
@@ -34,7 +36,7 @@ import java.util.Map;
public class JSONInsertMessage extends InsertMessage {
@JsonProperty
- String server, servicePrincipal, db, table;
+ String server, servicePrincipal, db, table, tableObjJson, ptnObjJson;
@JsonProperty
Long timestamp;
@@ -45,25 +47,39 @@ public class JSONInsertMessage extends InsertMessage {
@JsonProperty
List<String> files;
- @JsonProperty
- Map<String, String> partKeyVals;
-
/**
* Default constructor, needed for Jackson.
*/
public JSONInsertMessage() {
}
- public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
- Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
+ public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, Partition ptnObj,
+ boolean replace, Iterator<String> fileIter, Long timestamp) {
this.server = server;
this.servicePrincipal = servicePrincipal;
- this.db = db;
- this.table = table;
+
+ if (null == tableObj) {
+ throw new IllegalArgumentException("Table not valid.");
+ }
+
+ this.db = tableObj.getDbName();
+ this.table = tableObj.getTableName();
+
+ try {
+ this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ if (null != ptnObj) {
+ this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj);
+ } else {
+ this.ptnObjJson = null;
+ }
+ } catch (TException e) {
+ throw new IllegalArgumentException("Could not serialize: ", e);
+ }
+
this.timestamp = timestamp;
this.replace = Boolean.toString(replace);
- this.partKeyVals = partKeyVals;
this.files = Lists.newArrayList(fileIter);
+
checkValid();
}
@@ -78,11 +94,6 @@ public class JSONInsertMessage extends InsertMessage {
}
@Override
- public Map<String, String> getPartitionKeyValues() {
- return partKeyVals;
- }
-
- @Override
public Iterable<String> getFiles() {
return files;
}
@@ -106,6 +117,16 @@ public class JSONInsertMessage extends InsertMessage {
public boolean isReplace() { return Boolean.parseBoolean(replace); }
@Override
+ public Table getTableObj() throws Exception {
+ return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ }
+
+ @Override
+ public Partition getPtnObj() throws Exception {
+ return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class));
+ }
+
+ @Override
public String toString() {
try {
return JSONMessageDeserializer.mapper.writeValueAsString(this);
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 04a4041..a4c31f2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -28,10 +28,6 @@ import javax.annotation.Nullable;
import com.google.common.collect.Iterables;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.Index;
@@ -165,9 +161,9 @@ public class JSONMessageFactory extends MessageFactory {
}
@Override
- public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
- Iterator<String> fileIter) {
- return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
+ public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
+ boolean replace, Iterator<String> fileIter) {
+ return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, tableObj, partObj, replace, fileIter, now());
}
private long now() {
http://git-wip-us.apache.org/repos/asf/hive/blob/8dcc78a2/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index f514fb2..956bb08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -23,18 +23,15 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.thrift.TException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
class InsertHandler extends AbstractEventHandler {
@@ -45,11 +42,10 @@ class InsertHandler extends AbstractEventHandler {
@Override
public void handle(Context withinContext) throws Exception {
InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
- org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
- Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg);
List<Partition> qlPtns = null;
- if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
- qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
+ if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
+ qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));
}
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
@@ -88,13 +84,13 @@ class InsertHandler extends AbstractEventHandler {
dmd.write();
}
- private org.apache.hadoop.hive.ql.metadata.Table tableObject(
- Context withinContext, InsertMessage insertMsg) throws TException {
- return new org.apache.hadoop.hive.ql.metadata.Table(
- withinContext.db.getMSC().getTable(
- insertMsg.getDB(), insertMsg.getTable()
- )
- );
+ private org.apache.hadoop.hive.ql.metadata.Table tableObject(InsertMessage insertMsg) throws Exception {
+ return new org.apache.hadoop.hive.ql.metadata.Table(insertMsg.getTableObj());
+ }
+
+ private org.apache.hadoop.hive.ql.metadata.Partition partitionObject(
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable, InsertMessage insertMsg) throws Exception {
+ return new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, insertMsg.getPtnObj());
}
private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {