You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2017/01/09 21:06:37 UTC
hive git commit: HIVE-15366: REPL LOAD & DUMP support for incremental
INSERT events (Vaibhav Gumashta reviewed by Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 01e691c5c -> 2f501a8a0
HIVE-15366: REPL LOAD & DUMP support for incremental INSERT events (Vaibhav Gumashta reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f501a8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f501a8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f501a8a
Branch: refs/heads/master
Commit: 2f501a8a024bf25701f97f4621ceda9b080be95d
Parents: 01e691c
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon Jan 9 13:05:47 2017 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon Jan 9 13:05:47 2017 -0800
----------------------------------------------------------------------
.../listener/TestDbNotificationListener.java | 27 ++----
.../hive/ql/TestReplicationScenarios.java | 92 ++++++++++++++++++++
metastore/if/hive_metastore.thrift | 4 +-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 4 +-
.../metastore/api/InsertEventRequestData.java | 40 ++++-----
.../gen/thrift/gen-rb/hive_metastore_types.rb | 2 +-
.../hive/metastore/events/InsertEvent.java | 5 +-
.../hive/metastore/messaging/InsertMessage.java | 4 +-
.../metastore/messaging/MessageFactory.java | 3 +-
.../messaging/json/JSONInsertMessage.java | 27 +++---
.../messaging/json/JSONMessageFactory.java | 2 +-
.../hadoop/hive/ql/exec/ReplCopyTask.java | 14 +--
.../apache/hadoop/hive/ql/metadata/Hive.java | 9 +-
.../apache/hadoop/hive/ql/parse/EximUtil.java | 30 +++++--
.../hive/ql/parse/ExportSemanticAnalyzer.java | 4 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 47 +++++++++-
17 files changed, 230 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 39356ae..4eabb24 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -913,7 +913,7 @@ public class TestDbNotificationListener {
assertEquals(defaultDbName, event.getDbName());
assertEquals(tblName, event.getTableName());
// Parse the message field
- verifyInsertJSON(event, defaultDbName, tblName, false);
+ verifyInsertJSON(event, defaultDbName, tblName);
}
@Test
@@ -967,7 +967,7 @@ public class TestDbNotificationListener {
assertEquals(defaultDbName, event.getDbName());
assertEquals(tblName, event.getTableName());
// Parse the message field
- verifyInsertJSON(event, defaultDbName, tblName, false);
+ verifyInsertJSON(event, defaultDbName, tblName);
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
LinkedHashMap<String, String> partKeyValsFromNotif =
JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"),
@@ -1057,7 +1057,7 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 3, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, defaultDbName, tblName, true);
+ verifyInsertJSON(event, defaultDbName, tblName);
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
@@ -1090,7 +1090,7 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 3, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, null, sourceTblName, true);
+ verifyInsertJSON(event, null, sourceTblName);
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
@@ -1165,13 +1165,13 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 4, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, null, tblName, true);
+ verifyInsertJSON(event, null, tblName);
event = rsp.getEvents().get(6);
assertEquals(firstEventId + 7, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, null, tblName, true);
+ verifyInsertJSON(event, null, tblName);
event = rsp.getEvents().get(9);
assertEquals(firstEventId + 10, event.getEventId());
@@ -1181,13 +1181,13 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 11, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, null, tblName, true);
+ verifyInsertJSON(event, null, tblName);
event = rsp.getEvents().get(13);
assertEquals(firstEventId + 14, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
// Parse the message field
- verifyInsertJSON(event, null, tblName, true);
+ verifyInsertJSON(event, null, tblName);
event = rsp.getEvents().get(16);
assertEquals(firstEventId + 17, event.getEventId());
@@ -1223,8 +1223,7 @@ public class TestDbNotificationListener {
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
}
- private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName,
- boolean verifyChecksums) throws Exception {
+ private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName) throws Exception {
// Parse the message field
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
System.out.println("JSONInsertMessage: " + jsonTree.toString());
@@ -1239,14 +1238,6 @@ public class TestDbNotificationListener {
List<String> files =
JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList<String>());
assertTrue(files.size() > 0);
- if (verifyChecksums) {
- // Should have list of file checksums
- List<String> fileChecksums =
- JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"),
- new ArrayList<String>());
- assertTrue(fileChecksums.size() > 0);
-
- }
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index e29aa22..6b86080 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -592,6 +592,98 @@ public class TestReplicationScenarios {
verifyResults(ptn_data_2);
}
+ @Test
+ public void testIncrementalInserts() throws IOException {
+ String testName = "incrementalInserts";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName
+ + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+ String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" };
+ String[] empty = new String[] {};
+
+ String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+ run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned_late");
+ verifyResults(unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ run("SELECT * from " + dbName + "_dupe.unptned_late");
+ verifyResults(unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+ + ".ptned PARTITION(b=1)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName
+ + ".ptned PARTITION(b=2)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(ptn_data_2);
+
+ run("CREATE TABLE " + dbName
+ + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+ + ".ptned WHERE b=1");
+ run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
+ verifyResults(ptn_data_1);
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+ + ".ptned WHERE b=2");
+ run("SELECT a from " + dbName + ".ptned_late WHERE b=2");
+ verifyResults(ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
+ verifyResults(ptn_data_2);
+ }
private String getResult(int rowNum, int colNum) throws IOException {
return getResult(rowNum,colNum,false);
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 79592ea..bf80455 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -812,8 +812,8 @@ struct CurrentNotificationEventId {
struct InsertEventRequestData {
1: required list<string> filesAdded,
- // Checksum of files (UTF8 encoded string) added during this insert event (at the time they were added)
- 2: optional list<binary> filesAddedChecksum,
+ // Checksum of files (hex string of checksum byte payload)
+ 2: optional list<string> filesAddedChecksum,
}
union FireEventRequestData {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 1311b20..d605049 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -16166,7 +16166,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr
uint32_t _i660;
for (_i660 = 0; _i660 < _size656; ++_i660)
{
- xfer += iprot->readBinary(this->filesAddedChecksum[_i660]);
+ xfer += iprot->readString(this->filesAddedChecksum[_i660]);
}
xfer += iprot->readListEnd();
}
@@ -16213,7 +16213,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
std::vector<std::string> ::const_iterator _iter662;
for (_iter662 = this->filesAddedChecksum.begin(); _iter662 != this->filesAddedChecksum.end(); ++_iter662)
{
- xfer += oprot->writeBinary((*_iter662));
+ xfer += oprot->writeString((*_iter662));
}
xfer += oprot->writeListEnd();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
index 39a607d..fd1dc06 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
@@ -48,7 +48,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
}
private List<String> filesAdded; // required
- private List<ByteBuffer> filesAddedChecksum; // optional
+ private List<String> filesAddedChecksum; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -121,7 +121,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
tmpMap.put(_Fields.FILES_ADDED_CHECKSUM, new org.apache.thrift.meta_data.FieldMetaData("filesAddedChecksum", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))));
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InsertEventRequestData.class, metaDataMap);
}
@@ -145,7 +145,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
this.filesAdded = __this__filesAdded;
}
if (other.isSetFilesAddedChecksum()) {
- List<ByteBuffer> __this__filesAddedChecksum = new ArrayList<ByteBuffer>(other.filesAddedChecksum);
+ List<String> __this__filesAddedChecksum = new ArrayList<String>(other.filesAddedChecksum);
this.filesAddedChecksum = __this__filesAddedChecksum;
}
}
@@ -202,22 +202,22 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
return (this.filesAddedChecksum == null) ? 0 : this.filesAddedChecksum.size();
}
- public java.util.Iterator<ByteBuffer> getFilesAddedChecksumIterator() {
+ public java.util.Iterator<String> getFilesAddedChecksumIterator() {
return (this.filesAddedChecksum == null) ? null : this.filesAddedChecksum.iterator();
}
- public void addToFilesAddedChecksum(ByteBuffer elem) {
+ public void addToFilesAddedChecksum(String elem) {
if (this.filesAddedChecksum == null) {
- this.filesAddedChecksum = new ArrayList<ByteBuffer>();
+ this.filesAddedChecksum = new ArrayList<String>();
}
this.filesAddedChecksum.add(elem);
}
- public List<ByteBuffer> getFilesAddedChecksum() {
+ public List<String> getFilesAddedChecksum() {
return this.filesAddedChecksum;
}
- public void setFilesAddedChecksum(List<ByteBuffer> filesAddedChecksum) {
+ public void setFilesAddedChecksum(List<String> filesAddedChecksum) {
this.filesAddedChecksum = filesAddedChecksum;
}
@@ -250,7 +250,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (value == null) {
unsetFilesAddedChecksum();
} else {
- setFilesAddedChecksum((List<ByteBuffer>)value);
+ setFilesAddedChecksum((List<String>)value);
}
break;
@@ -396,7 +396,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (this.filesAddedChecksum == null) {
sb.append("null");
} else {
- org.apache.thrift.TBaseHelper.toString(this.filesAddedChecksum, sb);
+ sb.append(this.filesAddedChecksum);
}
first = false;
}
@@ -469,11 +469,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list561 = iprot.readListBegin();
- struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list561.size);
- ByteBuffer _elem562;
+ struct.filesAddedChecksum = new ArrayList<String>(_list561.size);
+ String _elem562;
for (int _i563 = 0; _i563 < _list561.size; ++_i563)
{
- _elem562 = iprot.readBinary();
+ _elem562 = iprot.readString();
struct.filesAddedChecksum.add(_elem562);
}
iprot.readListEnd();
@@ -513,9 +513,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
oprot.writeFieldBegin(FILES_ADDED_CHECKSUM_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.filesAddedChecksum.size()));
- for (ByteBuffer _iter565 : struct.filesAddedChecksum)
+ for (String _iter565 : struct.filesAddedChecksum)
{
- oprot.writeBinary(_iter565);
+ oprot.writeString(_iter565);
}
oprot.writeListEnd();
}
@@ -554,9 +554,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (struct.isSetFilesAddedChecksum()) {
{
oprot.writeI32(struct.filesAddedChecksum.size());
- for (ByteBuffer _iter567 : struct.filesAddedChecksum)
+ for (String _iter567 : struct.filesAddedChecksum)
{
- oprot.writeBinary(_iter567);
+ oprot.writeString(_iter567);
}
}
}
@@ -580,11 +580,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list571.size);
- ByteBuffer _elem572;
+ struct.filesAddedChecksum = new ArrayList<String>(_list571.size);
+ String _elem572;
for (int _i573 = 0; _i573 < _list571.size; ++_i573)
{
- _elem572 = iprot.readBinary();
+ _elem572 = iprot.readString();
struct.filesAddedChecksum.add(_elem572);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index ebed504..b6050c6 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2506,7 +2506,7 @@ class InsertEventRequestData
FIELDS = {
FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}},
- FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING, :binary => true}, :optional => true}
+ FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
}
def struct_fields; FIELDS; end
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index d9a42a7..7bc0e04 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -40,7 +39,7 @@ public class InsertEvent extends ListenerEvent {
private final String table;
private final Map<String, String> keyValues;
private final List<String> files;
- private List<ByteBuffer> fileChecksums = new ArrayList<ByteBuffer>();
+ private List<String> fileChecksums = new ArrayList<String>();
/**
*
@@ -104,7 +103,7 @@ public class InsertEvent extends ListenerEvent {
*
* @return
*/
- public List<ByteBuffer> getFileChecksums() {
+ public List<String> getFileChecksums() {
return fileChecksums;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index fe747df..7e6e34e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -45,7 +45,9 @@ public abstract class InsertMessage extends EventMessage {
public abstract Map<String,String> getPartitionKeyValues();
/**
- * Get the list of files created as a result of this DML operation. May be null.
+ * Get the list of files created as a result of this DML operation. May be null. The file uri may
+ * be an encoded uri, which represents both a uri and the file checksum
+ *
* @return List of new files, or null.
*/
public abstract List<String> getFiles();
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index fdb8e80..df25f43 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.util.ReflectionUtils;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -253,5 +252,5 @@ public abstract class MessageFactory {
* @return instance of InsertMessage
*/
public abstract InsertMessage buildInsertMessage(String db, String table,
- Map<String, String> partVals, List<String> files, List<ByteBuffer> fileChecksums);
+ Map<String, String> partVals, List<String> files, List<String> fileChecksums);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index bd9f9ec..820cc9c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -22,8 +22,6 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.codehaus.jackson.annotate.JsonProperty;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -44,9 +42,6 @@ public class JSONInsertMessage extends InsertMessage {
@JsonProperty
Map<String, String> partKeyVals;
- @JsonProperty
- List<byte[]> fileChecksums;
-
/**
* Default constructor, needed for Jackson.
*/
@@ -66,17 +61,21 @@ public class JSONInsertMessage extends InsertMessage {
}
public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
- Map<String, String> partKeyVals, List<String> files, List<ByteBuffer> checksums,
- Long timestamp) {
+ Map<String, String> partKeyVals, List<String> files, List<String> checksums, Long timestamp) {
this(server, servicePrincipal, db, table, partKeyVals, files, timestamp);
- fileChecksums = new ArrayList<byte[]>();
- for (ByteBuffer checksum : checksums) {
- byte[] checksumBytes = new byte[checksum.remaining()];
- checksum.get(checksumBytes);
- fileChecksums.add(checksumBytes);
+ for (int i = 0; i < files.size(); i++) {
+ if ((!checksums.isEmpty()) && (checksums.get(i) != null) && !checksums.get(i).isEmpty()) {
+ files.set(i, encodeFileUri(files.get(i), checksums.get(i)));
+ }
}
}
+ // TODO: this needs to be enhanced once change management based filesystem is implemented
+ // Currently using fileuri#checksum as the format
+ private String encodeFileUri(String fileUriStr, String fileChecksum) {
+ return fileUriStr + "#" + fileChecksum;
+ }
+
@Override
public String getTable() {
return table;
@@ -112,10 +111,6 @@ public class JSONInsertMessage extends InsertMessage {
return timestamp;
}
- public List<byte[]> getFileChecksums() {
- return fileChecksums;
- }
-
@Override
public String toString() {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 9954902..2749371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -176,7 +176,7 @@ public class JSONMessageFactory extends MessageFactory {
@Override
public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals,
- List<String> files, List<ByteBuffer> fileChecksums) {
+ List<String> files, List<String> fileChecksums) {
return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
files, fileChecksums, now());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 4c0f817..e6b943b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
@@ -27,7 +27,6 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.util.ArrayList;
@@ -113,7 +112,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
BufferedWriter listBW = null;
if (rwork.getListFilesOnOutputBehaviour()){
- Path listPath = new Path(toPath,"_files");
+ Path listPath = new Path(toPath,EximUtil.FILES_NAME);
LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
if (dstFs.exists(listPath)){
console.printError("Cannot make target _files file:" + listPath.toString());
@@ -169,7 +168,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
throws IOException {
- Path fileListing = new Path(path, "_files");
+ Path fileListing = new Path(path, EximUtil.FILES_NAME);
LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
if (! fs.exists(fileListing)){
LOG.debug("ReplCopyTask : _files does not exist");
@@ -184,8 +183,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
String line = null;
while ( (line = br.readLine()) != null){
LOG.debug("ReplCopyTask :_filesReadLine:" + line);
- Path p = new Path(line);
- FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit
+ String fileUriStr = EximUtil.getCMDecodedFileName(line);
+ // TODO HIVE-15490: Add checksum validation here
+ Path p = new Path(fileUriStr);
+ // TODO: again, fs cache should make this okay, but if not, revisit
+ FileSystem srcFs = p.getFileSystem(conf);
ret.add(srcFs.getFileStatus(p));
// Note - we need srcFs rather than fs, because it is possible that the _files lists files
// which are from a different filesystem than the fs where the _files file itself was loaded
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index be5a6a9..c5b3517 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2349,11 +2349,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
FileChecksum cksum = fileSystem.getFileChecksum(p);
// File checksum is not implemented for local filesystem (RawLocalFileSystem)
if (cksum != null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- cksum.write(new DataOutputStream(baos));
- insertData.addToFilesAddedChecksum(ByteBuffer.wrap(baos.toByteArray()));
+ String checksumString =
+ StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
+ insertData.addToFilesAddedChecksum(checksumString);
} else {
- insertData.addToFilesAddedChecksum(ByteBuffer.allocate(0));
+ // Add an empty checksum string for filesystems that don't generate one
+ insertData.addToFilesAddedChecksum("");
}
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 6e9602f..34e53d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Function;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -50,6 +51,7 @@ import org.json.JSONException;
import org.json.JSONObject;
import javax.annotation.Nullable;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -73,7 +75,10 @@ import java.util.TreeMap;
*/
public class EximUtil {
- public static final String METADATA_NAME="_metadata";
+ public static final String METADATA_NAME = "_metadata";
+ public static final String FILES_NAME = "_files";
+ public static final String DATA_PATH_NAME = "data";
+ public static final String URI_FRAGMENT_SEPARATOR = "#";
private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
@@ -278,6 +283,7 @@ public class EximUtil {
if (replicationSpec == null){
replicationSpec = new ReplicationSpec(); // instantiate default values if not specified
}
+
if (tableHandle == null){
replicationSpec.setNoop(true);
}
@@ -351,10 +357,6 @@ public class EximUtil {
jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
}
- private static void write(OutputStream out, String s) throws IOException {
- out.write(s.getBytes("UTF-8"));
- }
-
/**
* Utility class to help return complex value from readMetaData function
*/
@@ -571,4 +573,20 @@ public class EximUtil {
}
};
}
+
+ public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) {
+ // The checksum is set as the fragment portion of the file uri
+ return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
+ }
+
+ public static String getCMDecodedFileName(String encodedFileURIStr) {
+ String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR);
+ return uriAndFragment[0];
+ }
+
+ public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) {
+ // TODO: Implement this as part of HIVE-15490
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index f61274b..08bad63 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -170,7 +170,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
partitions = null;
}
- Path path = new Path(ctx.getLocalTmpPath(), "_metadata");
+ Path path = new Path(ctx.getLocalTmpPath(), EximUtil.METADATA_NAME);
EximUtil.createExportDump(
FileSystem.getLocal(conf),
path,
@@ -202,7 +202,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
} else {
Path fromPath = ts.tableHandle.getDataLocation();
- Path toDataPath = new Path(parentPath, "data");
+ Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME);
Task<? extends Serializable> rTask =
ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
rootTasks.add(rTask);
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5561e06..8c5cac2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -344,7 +344,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
- Path dataPath = new Path(fromURI.toString(), "data");
+ Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
@@ -777,7 +777,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
- Path dataPath = new Path(fromURI.toString(), "data");
+ Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
tblDesc.setLocation(dataPath.toString());
} else {
Path tablePath = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 9b83407..85f8c64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
+
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -61,13 +63,15 @@ import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.IOUtils;
-
import javax.annotation.Nullable;
+
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
@@ -108,6 +112,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
+ EVENT_INSERT("EVENT_INSERT"),
EVENT_UNKNOWN("EVENT_UNKNOWN");
String type = null;
@@ -559,7 +564,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
dmd.write();
break;
}
-
+ }
+ case MessageFactory.INSERT_EVENT: {
+ InsertMessage insertMsg = md.getInsertMessage(ev.getMessage());
+ String tblName = insertMsg.getTable();
+ Table qlMdTable = db.getTable(tblName);
+ Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+ List<Partition> qlPtns = null;
+ if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
+ qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false));
+ }
+ Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns,
+ replicationSpec);
+ Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME);
+ Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+ FileSystem fs = dataPath.getFileSystem(conf);
+ BufferedWriter fileListWriter =
+ new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+ try {
+ // TODO: HIVE-15205: move this metadata generation to a task
+ // Get the encoded filename of files that are being inserted
+ List<String> files = insertMsg.getFiles();
+ for (String fileUriStr : files) {
+ fileListWriter.write(fileUriStr + "\n");
+ }
+ } finally {
+ fileListWriter.close();
+ }
+ LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage());
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid);
+ dmd.setPayload(ev.getMessage());
+ dmd.write();
+ break;
}
// TODO : handle other event types
default:
@@ -957,6 +994,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec);
return tasks;
}
+ case EVENT_INSERT: {
+ md = MessageFactory.getInstance().getDeserializer();
+ InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
+ // Piggybacking in Import logic for now
+ return analyzeTableLoad(insertMessage.getDB(), insertMessage.getTable(), locn, precursor);
+ }
case EVENT_UNKNOWN: {
break;
}