You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/04/15 00:42:37 UTC
hive git commit: HIVE-16197 : Incremental insert into a partitioned
table doesn't get replicated. (Sankar Hariappan via Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 153e1a69b -> 98250bbe6
HIVE-16197 : Incremental insert into a partitioned table doesn't get replicated. (Sankar Hariappan via Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/98250bbe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/98250bbe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/98250bbe
Branch: refs/heads/master
Commit: 98250bbe611b2bbd069049af54eda10d4b079b08
Parents: 153e1a6
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Fri Apr 14 17:41:36 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Fri Apr 14 17:42:32 2017 -0700
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 2 +-
.../hive/ql/TestReplicationScenarios.java | 89 +++++++++++--
metastore/if/hive_metastore.thrift | 5 +-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 30 ++++-
.../gen/thrift/gen-cpp/hive_metastore_types.h | 12 +-
.../metastore/api/InsertEventRequestData.java | 127 +++++++++++++++++--
.../src/gen/thrift/gen-php/metastore/Types.php | 31 ++++-
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 25 +++-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 6 +-
.../hive/metastore/events/InsertEvent.java | 11 ++
.../hive/metastore/messaging/InsertMessage.java | 6 +
.../metastore/messaging/MessageFactory.java | 3 +-
.../messaging/json/JSONInsertMessage.java | 9 +-
.../messaging/json/JSONMessageFactory.java | 5 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 9 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 3 +-
.../hadoop/hive/ql/parse/ReplicationSpec.java | 22 ++--
.../ql/parse/repl/events/InsertHandler.java | 23 +++-
19 files changed, 350 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index bbfbc36..df423f0 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -439,7 +439,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
public void onInsert(InsertEvent insertEvent) throws MetaException {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
- insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+ insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
.toString());
event.setDbName(insertEvent.getDb());
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index aa2123e..ec238d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -893,13 +893,81 @@ public class TestReplicationScenarios {
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
String[] unptn_data = new String[] { "eleven", "twelve" };
+
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
- verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
- verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data);
+ verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+ String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+ run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+ // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
+ //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+ }
+
+ @Test
+ public void testIncrementalInsertToPartition() throws IOException {
+ String testName = "incrementalInsertToPartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -910,14 +978,14 @@ public class TestReplicationScenarios {
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
- verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data);
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
- String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" };
- run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')");
- verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins);
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -929,7 +997,8 @@ public class TestReplicationScenarios {
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
- verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins);
+ // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
+ //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index d056498..a1bdc30 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -812,9 +812,10 @@ struct CurrentNotificationEventId {
}
struct InsertEventRequestData {
- 1: required list<string> filesAdded,
+ 1: optional bool replace,
+ 2: required list<string> filesAdded,
// Checksum of files (hex string of checksum byte payload)
- 2: optional list<string> filesAddedChecksum,
+ 3: optional list<string> filesAddedChecksum,
}
union FireEventRequestData {
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index be8429e..b38e1cb 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -16126,6 +16126,11 @@ InsertEventRequestData::~InsertEventRequestData() throw() {
}
+void InsertEventRequestData::__set_replace(const bool val) {
+ this->replace = val;
+__isset.replace = true;
+}
+
void InsertEventRequestData::__set_filesAdded(const std::vector<std::string> & val) {
this->filesAdded = val;
}
@@ -16158,6 +16163,14 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr
switch (fid)
{
case 1:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->replace);
+ this->__isset.replace = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
if (ftype == ::apache::thrift::protocol::T_LIST) {
{
this->filesAdded.clear();
@@ -16177,7 +16190,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr
xfer += iprot->skip(ftype);
}
break;
- case 2:
+ case 3:
if (ftype == ::apache::thrift::protocol::T_LIST) {
{
this->filesAddedChecksum.clear();
@@ -16216,7 +16229,12 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("InsertEventRequestData");
- xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 1);
+ if (this->__isset.replace) {
+ xfer += oprot->writeFieldBegin("replace", ::apache::thrift::protocol::T_BOOL, 1);
+ xfer += oprot->writeBool(this->replace);
+ xfer += oprot->writeFieldEnd();
+ }
+ xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 2);
{
xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->filesAdded.size()));
std::vector<std::string> ::const_iterator _iter661;
@@ -16229,7 +16247,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
xfer += oprot->writeFieldEnd();
if (this->__isset.filesAddedChecksum) {
- xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 2);
+ xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 3);
{
xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->filesAddedChecksum.size()));
std::vector<std::string> ::const_iterator _iter662;
@@ -16248,17 +16266,20 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
void swap(InsertEventRequestData &a, InsertEventRequestData &b) {
using ::std::swap;
+ swap(a.replace, b.replace);
swap(a.filesAdded, b.filesAdded);
swap(a.filesAddedChecksum, b.filesAddedChecksum);
swap(a.__isset, b.__isset);
}
InsertEventRequestData::InsertEventRequestData(const InsertEventRequestData& other663) {
+ replace = other663.replace;
filesAdded = other663.filesAdded;
filesAddedChecksum = other663.filesAddedChecksum;
__isset = other663.__isset;
}
InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventRequestData& other664) {
+ replace = other664.replace;
filesAdded = other664.filesAdded;
filesAddedChecksum = other664.filesAddedChecksum;
__isset = other664.__isset;
@@ -16267,7 +16288,8 @@ InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventReque
void InsertEventRequestData::printTo(std::ostream& out) const {
using ::apache::thrift::to_string;
out << "InsertEventRequestData(";
- out << "filesAdded=" << to_string(filesAdded);
+ out << "replace="; (__isset.replace ? (out << to_string(replace)) : (out << "<null>"));
+ out << ", " << "filesAdded=" << to_string(filesAdded);
out << ", " << "filesAddedChecksum="; (__isset.filesAddedChecksum ? (out << to_string(filesAddedChecksum)) : (out << "<null>"));
out << ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
index e73333a..50c61a7 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -6555,7 +6555,8 @@ inline std::ostream& operator<<(std::ostream& out, const CurrentNotificationEven
}
typedef struct _InsertEventRequestData__isset {
- _InsertEventRequestData__isset() : filesAddedChecksum(false) {}
+ _InsertEventRequestData__isset() : replace(false), filesAddedChecksum(false) {}
+ bool replace :1;
bool filesAddedChecksum :1;
} _InsertEventRequestData__isset;
@@ -6564,21 +6565,28 @@ class InsertEventRequestData {
InsertEventRequestData(const InsertEventRequestData&);
InsertEventRequestData& operator=(const InsertEventRequestData&);
- InsertEventRequestData() {
+ InsertEventRequestData() : replace(0) {
}
virtual ~InsertEventRequestData() throw();
+ bool replace;
std::vector<std::string> filesAdded;
std::vector<std::string> filesAddedChecksum;
_InsertEventRequestData__isset __isset;
+ void __set_replace(const bool val);
+
void __set_filesAdded(const std::vector<std::string> & val);
void __set_filesAddedChecksum(const std::vector<std::string> & val);
bool operator == (const InsertEventRequestData & rhs) const
{
+ if (__isset.replace != rhs.__isset.replace)
+ return false;
+ else if (__isset.replace && !(replace == rhs.replace))
+ return false;
if (!(filesAdded == rhs.filesAdded))
return false;
if (__isset.filesAddedChecksum != rhs.__isset.filesAddedChecksum)
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
index fd1dc06..354e634 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
@@ -38,8 +38,9 @@ import org.slf4j.LoggerFactory;
public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEventRequestData, InsertEventRequestData._Fields>, java.io.Serializable, Cloneable, Comparable<InsertEventRequestData> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InsertEventRequestData");
- private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)1);
- private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)2);
+ private static final org.apache.thrift.protocol.TField REPLACE_FIELD_DESC = new org.apache.thrift.protocol.TField("replace", org.apache.thrift.protocol.TType.BOOL, (short)1);
+ private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)2);
+ private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -47,13 +48,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
schemes.put(TupleScheme.class, new InsertEventRequestDataTupleSchemeFactory());
}
+ private boolean replace; // optional
private List<String> filesAdded; // required
private List<String> filesAddedChecksum; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- FILES_ADDED((short)1, "filesAdded"),
- FILES_ADDED_CHECKSUM((short)2, "filesAddedChecksum");
+ REPLACE((short)1, "replace"),
+ FILES_ADDED((short)2, "filesAdded"),
+ FILES_ADDED_CHECKSUM((short)3, "filesAddedChecksum");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -68,9 +71,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
- case 1: // FILES_ADDED
+ case 1: // REPLACE
+ return REPLACE;
+ case 2: // FILES_ADDED
return FILES_ADDED;
- case 2: // FILES_ADDED_CHECKSUM
+ case 3: // FILES_ADDED_CHECKSUM
return FILES_ADDED_CHECKSUM;
default:
return null;
@@ -112,10 +117,14 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
}
// isset id assignments
- private static final _Fields optionals[] = {_Fields.FILES_ADDED_CHECKSUM};
+ private static final int __REPLACE_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.REPLACE,_Fields.FILES_ADDED_CHECKSUM};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REPLACE, new org.apache.thrift.meta_data.FieldMetaData("replace", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.FILES_ADDED, new org.apache.thrift.meta_data.FieldMetaData("filesAdded", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
@@ -140,6 +149,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
* Performs a deep copy on <i>other</i>.
*/
public InsertEventRequestData(InsertEventRequestData other) {
+ __isset_bitfield = other.__isset_bitfield;
+ this.replace = other.replace;
if (other.isSetFilesAdded()) {
List<String> __this__filesAdded = new ArrayList<String>(other.filesAdded);
this.filesAdded = __this__filesAdded;
@@ -156,10 +167,34 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
@Override
public void clear() {
+ setReplaceIsSet(false);
+ this.replace = false;
this.filesAdded = null;
this.filesAddedChecksum = null;
}
+ public boolean isReplace() {
+ return this.replace;
+ }
+
+ public void setReplace(boolean replace) {
+ this.replace = replace;
+ setReplaceIsSet(true);
+ }
+
+ public void unsetReplace() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __REPLACE_ISSET_ID);
+ }
+
+ /** Returns true if field replace is set (has been assigned a value) and false otherwise */
+ public boolean isSetReplace() {
+ return EncodingUtils.testBit(__isset_bitfield, __REPLACE_ISSET_ID);
+ }
+
+ public void setReplaceIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REPLACE_ISSET_ID, value);
+ }
+
public int getFilesAddedSize() {
return (this.filesAdded == null) ? 0 : this.filesAdded.size();
}
@@ -238,6 +273,14 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
public void setFieldValue(_Fields field, Object value) {
switch (field) {
+ case REPLACE:
+ if (value == null) {
+ unsetReplace();
+ } else {
+ setReplace((Boolean)value);
+ }
+ break;
+
case FILES_ADDED:
if (value == null) {
unsetFilesAdded();
@@ -259,6 +302,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
public Object getFieldValue(_Fields field) {
switch (field) {
+ case REPLACE:
+ return isReplace();
+
case FILES_ADDED:
return getFilesAdded();
@@ -276,6 +322,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
}
switch (field) {
+ case REPLACE:
+ return isSetReplace();
case FILES_ADDED:
return isSetFilesAdded();
case FILES_ADDED_CHECKSUM:
@@ -297,6 +345,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
if (that == null)
return false;
+ boolean this_present_replace = true && this.isSetReplace();
+ boolean that_present_replace = true && that.isSetReplace();
+ if (this_present_replace || that_present_replace) {
+ if (!(this_present_replace && that_present_replace))
+ return false;
+ if (this.replace != that.replace)
+ return false;
+ }
+
boolean this_present_filesAdded = true && this.isSetFilesAdded();
boolean that_present_filesAdded = true && that.isSetFilesAdded();
if (this_present_filesAdded || that_present_filesAdded) {
@@ -322,6 +379,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
public int hashCode() {
List<Object> list = new ArrayList<Object>();
+ boolean present_replace = true && (isSetReplace());
+ list.add(present_replace);
+ if (present_replace)
+ list.add(replace);
+
boolean present_filesAdded = true && (isSetFilesAdded());
list.add(present_filesAdded);
if (present_filesAdded)
@@ -343,6 +405,16 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
int lastComparison = 0;
+ lastComparison = Boolean.valueOf(isSetReplace()).compareTo(other.isSetReplace());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetReplace()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replace, other.replace);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(isSetFilesAdded()).compareTo(other.isSetFilesAdded());
if (lastComparison != 0) {
return lastComparison;
@@ -383,6 +455,12 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
StringBuilder sb = new StringBuilder("InsertEventRequestData(");
boolean first = true;
+ if (isSetReplace()) {
+ sb.append("replace:");
+ sb.append(this.replace);
+ first = false;
+ }
+ if (!first) sb.append(", ");
sb.append("filesAdded:");
if (this.filesAdded == null) {
sb.append("null");
@@ -423,6 +501,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -447,7 +527,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
break;
}
switch (schemeField.id) {
- case 1: // FILES_ADDED
+ case 1: // REPLACE
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.replace = iprot.readBool();
+ struct.setReplaceIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // FILES_ADDED
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list558 = iprot.readListBegin();
@@ -465,7 +553,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
- case 2: // FILES_ADDED_CHECKSUM
+ case 3: // FILES_ADDED_CHECKSUM
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list561 = iprot.readListBegin();
@@ -496,6 +584,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.isSetReplace()) {
+ oprot.writeFieldBegin(REPLACE_FIELD_DESC);
+ oprot.writeBool(struct.replace);
+ oprot.writeFieldEnd();
+ }
if (struct.filesAdded != null) {
oprot.writeFieldBegin(FILES_ADDED_FIELD_DESC);
{
@@ -547,10 +640,16 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
}
}
BitSet optionals = new BitSet();
- if (struct.isSetFilesAddedChecksum()) {
+ if (struct.isSetReplace()) {
optionals.set(0);
}
- oprot.writeBitSet(optionals, 1);
+ if (struct.isSetFilesAddedChecksum()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetReplace()) {
+ oprot.writeBool(struct.replace);
+ }
if (struct.isSetFilesAddedChecksum()) {
{
oprot.writeI32(struct.filesAddedChecksum.size());
@@ -576,8 +675,12 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
}
}
struct.setFilesAddedIsSet(true);
- BitSet incoming = iprot.readBitSet(1);
+ BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
+ struct.replace = iprot.readBool();
+ struct.setReplaceIsSet(true);
+ }
+ if (incoming.get(1)) {
{
org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.filesAddedChecksum = new ArrayList<String>(_list571.size);
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index 2dfa1a9..4a3bf85 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -16007,6 +16007,10 @@ class InsertEventRequestData {
static $_TSPEC;
/**
+ * @var bool
+ */
+ public $replace = null;
+ /**
* @var string[]
*/
public $filesAdded = null;
@@ -16019,6 +16023,10 @@ class InsertEventRequestData {
if (!isset(self::$_TSPEC)) {
self::$_TSPEC = array(
1 => array(
+ 'var' => 'replace',
+ 'type' => TType::BOOL,
+ ),
+ 2 => array(
'var' => 'filesAdded',
'type' => TType::LST,
'etype' => TType::STRING,
@@ -16026,7 +16034,7 @@ class InsertEventRequestData {
'type' => TType::STRING,
),
),
- 2 => array(
+ 3 => array(
'var' => 'filesAddedChecksum',
'type' => TType::LST,
'etype' => TType::STRING,
@@ -16037,6 +16045,9 @@ class InsertEventRequestData {
);
}
if (is_array($vals)) {
+ if (isset($vals['replace'])) {
+ $this->replace = $vals['replace'];
+ }
if (isset($vals['filesAdded'])) {
$this->filesAdded = $vals['filesAdded'];
}
@@ -16066,6 +16077,13 @@ class InsertEventRequestData {
switch ($fid)
{
case 1:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->replace);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
if ($ftype == TType::LST) {
$this->filesAdded = array();
$_size495 = 0;
@@ -16082,7 +16100,7 @@ class InsertEventRequestData {
$xfer += $input->skip($ftype);
}
break;
- case 2:
+ case 3:
if ($ftype == TType::LST) {
$this->filesAddedChecksum = array();
$_size501 = 0;
@@ -16112,11 +16130,16 @@ class InsertEventRequestData {
public function write($output) {
$xfer = 0;
$xfer += $output->writeStructBegin('InsertEventRequestData');
+ if ($this->replace !== null) {
+ $xfer += $output->writeFieldBegin('replace', TType::BOOL, 1);
+ $xfer += $output->writeBool($this->replace);
+ $xfer += $output->writeFieldEnd();
+ }
if ($this->filesAdded !== null) {
if (!is_array($this->filesAdded)) {
throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
}
- $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 1);
+ $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 2);
{
$output->writeListBegin(TType::STRING, count($this->filesAdded));
{
@@ -16133,7 +16156,7 @@ class InsertEventRequestData {
if (!is_array($this->filesAddedChecksum)) {
throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
}
- $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 2);
+ $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 3);
{
$output->writeListBegin(TType::STRING, count($this->filesAddedChecksum));
{
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 3faf1bb..9480c85 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11156,17 +11156,20 @@ class CurrentNotificationEventId:
class InsertEventRequestData:
"""
Attributes:
+ - replace
- filesAdded
- filesAddedChecksum
"""
thrift_spec = (
None, # 0
- (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1
- (2, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 2
+ (1, TType.BOOL, 'replace', None, None, ), # 1
+ (2, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 2
+ (3, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 3
)
- def __init__(self, filesAdded=None, filesAddedChecksum=None,):
+ def __init__(self, replace=None, filesAdded=None, filesAddedChecksum=None,):
+ self.replace = replace
self.filesAdded = filesAdded
self.filesAddedChecksum = filesAddedChecksum
@@ -11180,6 +11183,11 @@ class InsertEventRequestData:
if ftype == TType.STOP:
break
if fid == 1:
+ if ftype == TType.BOOL:
+ self.replace = iprot.readBool()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
if ftype == TType.LIST:
self.filesAdded = []
(_etype495, _size492) = iprot.readListBegin()
@@ -11189,7 +11197,7 @@ class InsertEventRequestData:
iprot.readListEnd()
else:
iprot.skip(ftype)
- elif fid == 2:
+ elif fid == 3:
if ftype == TType.LIST:
self.filesAddedChecksum = []
(_etype501, _size498) = iprot.readListBegin()
@@ -11209,15 +11217,19 @@ class InsertEventRequestData:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('InsertEventRequestData')
+ if self.replace is not None:
+ oprot.writeFieldBegin('replace', TType.BOOL, 1)
+ oprot.writeBool(self.replace)
+ oprot.writeFieldEnd()
if self.filesAdded is not None:
- oprot.writeFieldBegin('filesAdded', TType.LIST, 1)
+ oprot.writeFieldBegin('filesAdded', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.filesAdded))
for iter504 in self.filesAdded:
oprot.writeString(iter504)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.filesAddedChecksum is not None:
- oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 2)
+ oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3)
oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
for iter505 in self.filesAddedChecksum:
oprot.writeString(iter505)
@@ -11234,6 +11246,7 @@ class InsertEventRequestData:
def __hash__(self):
value = 17
+ value = (value * 31) ^ hash(self.replace)
value = (value * 31) ^ hash(self.filesAdded)
value = (value * 31) ^ hash(self.filesAddedChecksum)
return value
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 5342451..7766071 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2503,10 +2503,12 @@ end
class InsertEventRequestData
include ::Thrift::Struct, ::Thrift::Struct_Union
- FILESADDED = 1
- FILESADDEDCHECKSUM = 2
+ REPLACE = 1
+ FILESADDED = 2
+ FILESADDEDCHECKSUM = 3
FIELDS = {
+ REPLACE => {:type => ::Thrift::Types::BOOL, :name => 'replace', :optional => true},
FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}},
FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index 7bc0e04..dff1195 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -38,6 +38,7 @@ public class InsertEvent extends ListenerEvent {
private final String db;
private final String table;
private final Map<String, String> keyValues;
+ private final boolean replace;
private final List<String> files;
private List<String> fileChecksums = new ArrayList<String>();
@@ -56,6 +57,9 @@ public class InsertEvent extends ListenerEvent {
super(status, handler);
this.db = db;
this.table = table;
+
+ // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+ this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
this.files = insertData.getFilesAdded();
GetTableRequest req = new GetTableRequest(db, table);
req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
@@ -90,6 +94,13 @@ public class InsertEvent extends ListenerEvent {
}
/**
+ * @return The replace flag.
+ */
+ public boolean isReplace() {
+ return replace;
+ }
+
+ /**
* Get list of files created as a result of this DML operation
*
* @return list of new files
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 3d16721..6d146e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -37,6 +37,12 @@ public abstract class InsertMessage extends EventMessage {
public abstract String getTable();
/**
+ * Getter for the replace flag being insert into/overwrite
+ * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean).
+ */
+ public abstract boolean isReplace();
+
+ /**
* Get the map of partition keyvalues. Will be null if this insert is to a table and not a
* partition.
* @return Map of partition keyvalues, or null.
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index aa770f2..1ed7cc5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -231,9 +231,10 @@ public abstract class MessageFactory {
* @param table Name of the table the insert occurred in
* @param partVals Partition values for the partition that the insert occurred in, may be null if
* the insert was done into a non-partitioned table
+ * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
* @param files Iterator of file created
* @return instance of InsertMessage
*/
public abstract InsertMessage buildInsertMessage(String db, String table,
- Map<String, String> partVals, Iterator<String> files);
+ Map<String, String> partVals, boolean replace, Iterator<String> files);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index e1316a4..c059d47 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -40,6 +40,9 @@ public class JSONInsertMessage extends InsertMessage {
Long timestamp;
@JsonProperty
+ String replace;
+
+ @JsonProperty
List<String> files;
@JsonProperty
@@ -52,12 +55,13 @@ public class JSONInsertMessage extends InsertMessage {
}
public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
- Map<String, String> partKeyVals, Iterator<String> fileIter, Long timestamp) {
+ Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
this.server = server;
this.servicePrincipal = servicePrincipal;
this.db = db;
this.table = table;
this.timestamp = timestamp;
+ this.replace = Boolean.toString(replace);
this.partKeyVals = partKeyVals;
this.files = Lists.newArrayList(fileIter);
checkValid();
@@ -99,6 +103,9 @@ public class JSONInsertMessage extends InsertMessage {
}
@Override
+ public boolean isReplace() { return Boolean.parseBoolean(replace); }
+
+ @Override
public String toString() {
try {
return JSONMessageDeserializer.mapper.writeValueAsString(this);
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 3406afb..bb81949 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -161,10 +161,9 @@ public class JSONMessageFactory extends MessageFactory {
}
@Override
- public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals,
+ public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
Iterator<String> fileIter) {
- return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
- fileIter, now());
+ return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
}
private long now() {
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6deea96..c66bbdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1662,7 +1662,7 @@ public class Hive {
alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
validatePartition(newTPart);
if ((null != newFiles) || replace) {
- fireInsertEvent(tbl, partSpec, newFiles);
+ fireInsertEvent(tbl, partSpec, replace, newFiles);
} else {
LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
}
@@ -2056,7 +2056,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
throw new HiveException(e);
}
- fireInsertEvent(tbl, null, newFiles);
+ fireInsertEvent(tbl, null, replace, newFiles);
}
/**
@@ -2281,7 +2281,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
else {
alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
- fireInsertEvent(tbl, partSpec, newFiles);
+ fireInsertEvent(tbl, partSpec, true, newFiles);
}
}
if (tpart == null) {
@@ -2331,7 +2331,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
tpart.getSd().setLocation(partPath);
}
- private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
+ private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("Firing dml insert event");
@@ -2343,6 +2343,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
FireEventRequestData data = new FireEventRequestData();
InsertEventRequestData insertData = new InsertEventRequestData();
+ insertData.setReplace(replace);
data.setInsertData(insertData);
if (newFiles != null && newFiles.size() > 0) {
for (Path p : newFiles) {
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 245c483..3a1fc70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -413,7 +413,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getOutputs(), addPartitionDesc), x.getConf());
LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
Utilities.getTableDesc(table),
- partSpec.getPartSpec(), true);
+ partSpec.getPartSpec(), replicationSpec.isReplace());
loadTableWork.setInheritTableSpecs(false);
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
x.getInputs(), x.getOutputs(), loadTableWork, null, false),
@@ -921,7 +921,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
- loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec, x);
+ loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x);
} else {
x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 3ac7746..a85ba42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -452,7 +452,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
getNewEventOnlyReplicationSpec(ev.getEventId())
);
EventHandlerFactory.handlerFor(ev).handle(context);
-
}
public static void injectNextDumpDirForTest(String dumpdir){
@@ -1223,7 +1222,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// Use for specifying object state as well as event state
private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
- return new ReplicationSpec(true, false, evState, objState, false, true, false);
+ return new ReplicationSpec(true, false, evState, objState, false, true, true);
}
// Use for replication states focused on event only, where the obj state will be the event state
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 48362a3..1ea608b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -44,7 +44,7 @@ public class ReplicationSpec {
private String currStateId = null;
private boolean isNoop = false;
private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
- private boolean isInsert = false; // default is that the import mode is replace-into
+ private boolean isReplace = true; // default is that the import mode is insert overwrite
// Key definitions related to replication
public enum KEY {
@@ -53,7 +53,7 @@ public class ReplicationSpec {
CURR_STATE_ID("repl.last.id"),
NOOP("repl.noop"),
LAZY("repl.lazy"),
- IS_INSERT("repl.is.insert")
+ IS_REPLACE("repl.is.replace")
;
private final String keyName;
@@ -136,14 +136,14 @@ public class ReplicationSpec {
public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
String eventReplicationState, String currentReplicationState,
- boolean isNoop, boolean isLazy, boolean isInsert) {
+ boolean isNoop, boolean isLazy, boolean isReplace) {
this.isInReplicationScope = isInReplicationScope;
this.isMetadataOnly = isMetadataOnly;
this.eventId = eventReplicationState;
this.currStateId = currentReplicationState;
this.isNoop = isNoop;
this.isLazy = isLazy;
- this.isInsert = isInsert;
+ this.isReplace = isReplace;
}
public ReplicationSpec(Function<String, String> keyFetcher) {
@@ -162,7 +162,7 @@ public class ReplicationSpec {
this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
- this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString()));
+ this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
}
/**
@@ -296,12 +296,12 @@ public class ReplicationSpec {
}
/**
- * @return true if this statement refers to insert-into operation.
+ * @return true if this statement refers to insert-into or insert-overwrite operation.
*/
- public boolean isInsert(){ return isInsert; }
+ public boolean isReplace(){ return isReplace; }
- public void setIsInsert(boolean isInsert){
- this.isInsert = isInsert;
+ public void setIsReplace(boolean isReplace){
+ this.isReplace = isReplace;
}
/**
@@ -370,8 +370,8 @@ public class ReplicationSpec {
return String.valueOf(isNoop());
case LAZY:
return String.valueOf(isLazy());
- case IS_INSERT:
- return String.valueOf(isInsert());
+ case IS_REPLACE:
+ return String.valueOf(isReplace());
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
index 1346276..e9f2a6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
@@ -51,16 +51,30 @@ public class InsertHandler extends AbstractHandler {
qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
}
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
- // Mark the replication type as insert into to avoid overwrite while import
- withinContext.replicationSpec.setIsInsert(true);
+
+ // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
+ withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
qlMdTable, qlPtns,
withinContext.replicationSpec);
Iterable<String> files = insertMsg.getFiles();
if (files != null) {
+ Path dataPath;
+ if ((null == qlPtns) || qlPtns.isEmpty()) {
+ dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+ } else {
+ /*
+ * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
+ * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
+ * will have only one entry.
+ */
+ assert(1 == qlPtns.size());
+ dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
+ }
+
// encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext)) {
+ try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
for (String file : files) {
fileListWriter.write(file + "\n");
}
@@ -82,8 +96,7 @@ public class InsertHandler extends AbstractHandler {
);
}
- private BufferedWriter writer(Context withinContext) throws IOException {
- Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));