You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/04/15 00:42:37 UTC

hive git commit: HIVE-16197 : Incremental insert into a partitioned table doesn't get replicated. (Sankar Hariappan via Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master 153e1a69b -> 98250bbe6


HIVE-16197 : Incremental insert into a partitioned table doesn't get replicated. (Sankar Hariappan via Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/98250bbe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/98250bbe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/98250bbe

Branch: refs/heads/master
Commit: 98250bbe611b2bbd069049af54eda10d4b079b08
Parents: 153e1a6
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Fri Apr 14 17:41:36 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Fri Apr 14 17:42:32 2017 -0700

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   2 +-
 .../hive/ql/TestReplicationScenarios.java       |  89 +++++++++++--
 metastore/if/hive_metastore.thrift              |   5 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  30 ++++-
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |  12 +-
 .../metastore/api/InsertEventRequestData.java   | 127 +++++++++++++++++--
 .../src/gen/thrift/gen-php/metastore/Types.php  |  31 ++++-
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  25 +++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   6 +-
 .../hive/metastore/events/InsertEvent.java      |  11 ++
 .../hive/metastore/messaging/InsertMessage.java |   6 +
 .../metastore/messaging/MessageFactory.java     |   3 +-
 .../messaging/json/JSONInsertMessage.java       |   9 +-
 .../messaging/json/JSONMessageFactory.java      |   5 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   9 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   4 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   3 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |  22 ++--
 .../ql/parse/repl/events/InsertHandler.java     |  23 +++-
 19 files changed, 350 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index bbfbc36..df423f0 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -439,7 +439,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
   public void onInsert(InsertEvent insertEvent) throws MetaException {
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
-            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
             new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
             .toString());
     event.setDbName(insertEvent.getDb());

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index aa2123e..ec238d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -893,13 +893,81 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
 
     String[] unptn_data = new String[] { "eleven", "twelve" };
+
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
-    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
 
     run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
     run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
-    verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data);
+    verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+    String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+    run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+    // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
+    //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+  }
+
+  @Test
+  public void testIncrementalInsertToPartition() throws IOException {
+    String testName = "incrementalInsertToPartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
 
     advanceDumpDir();
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -910,14 +978,14 @@ public class TestReplicationScenarios {
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
-    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
 
-    String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" };
-    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')");
-    verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins);
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
 
     advanceDumpDir();
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -929,7 +997,8 @@ public class TestReplicationScenarios {
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
 
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins);
+    // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
+    //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index d056498..a1bdc30 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -812,9 +812,10 @@ struct CurrentNotificationEventId {
 }
 
 struct InsertEventRequestData {
-    1: required list<string> filesAdded,
+    1: optional bool replace,
+    2: required list<string> filesAdded,
     // Checksum of files (hex string of checksum byte payload)
-    2: optional list<string> filesAddedChecksum,
+    3: optional list<string> filesAddedChecksum,
 }
 
 union FireEventRequestData {

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index be8429e..b38e1cb 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -16126,6 +16126,11 @@ InsertEventRequestData::~InsertEventRequestData() throw() {
 }
 
 
+void InsertEventRequestData::__set_replace(const bool val) {
+  this->replace = val;
+__isset.replace = true;
+}
+
 void InsertEventRequestData::__set_filesAdded(const std::vector<std::string> & val) {
   this->filesAdded = val;
 }
@@ -16158,6 +16163,14 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr
     switch (fid)
     {
       case 1:
+        if (ftype == ::apache::thrift::protocol::T_BOOL) {
+          xfer += iprot->readBool(this->replace);
+          this->__isset.replace = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
         if (ftype == ::apache::thrift::protocol::T_LIST) {
           {
             this->filesAdded.clear();
@@ -16177,7 +16190,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr
           xfer += iprot->skip(ftype);
         }
         break;
-      case 2:
+      case 3:
         if (ftype == ::apache::thrift::protocol::T_LIST) {
           {
             this->filesAddedChecksum.clear();
@@ -16216,7 +16229,12 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
   apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
   xfer += oprot->writeStructBegin("InsertEventRequestData");
 
-  xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 1);
+  if (this->__isset.replace) {
+    xfer += oprot->writeFieldBegin("replace", ::apache::thrift::protocol::T_BOOL, 1);
+    xfer += oprot->writeBool(this->replace);
+    xfer += oprot->writeFieldEnd();
+  }
+  xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 2);
   {
     xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->filesAdded.size()));
     std::vector<std::string> ::const_iterator _iter661;
@@ -16229,7 +16247,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
   xfer += oprot->writeFieldEnd();
 
   if (this->__isset.filesAddedChecksum) {
-    xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 2);
+    xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 3);
     {
       xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->filesAddedChecksum.size()));
       std::vector<std::string> ::const_iterator _iter662;
@@ -16248,17 +16266,20 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op
 
 void swap(InsertEventRequestData &a, InsertEventRequestData &b) {
   using ::std::swap;
+  swap(a.replace, b.replace);
   swap(a.filesAdded, b.filesAdded);
   swap(a.filesAddedChecksum, b.filesAddedChecksum);
   swap(a.__isset, b.__isset);
 }
 
 InsertEventRequestData::InsertEventRequestData(const InsertEventRequestData& other663) {
+  replace = other663.replace;
   filesAdded = other663.filesAdded;
   filesAddedChecksum = other663.filesAddedChecksum;
   __isset = other663.__isset;
 }
 InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventRequestData& other664) {
+  replace = other664.replace;
   filesAdded = other664.filesAdded;
   filesAddedChecksum = other664.filesAddedChecksum;
   __isset = other664.__isset;
@@ -16267,7 +16288,8 @@ InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventReque
 void InsertEventRequestData::printTo(std::ostream& out) const {
   using ::apache::thrift::to_string;
   out << "InsertEventRequestData(";
-  out << "filesAdded=" << to_string(filesAdded);
+  out << "replace="; (__isset.replace ? (out << to_string(replace)) : (out << "<null>"));
+  out << ", " << "filesAdded=" << to_string(filesAdded);
   out << ", " << "filesAddedChecksum="; (__isset.filesAddedChecksum ? (out << to_string(filesAddedChecksum)) : (out << "<null>"));
   out << ")";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
index e73333a..50c61a7 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -6555,7 +6555,8 @@ inline std::ostream& operator<<(std::ostream& out, const CurrentNotificationEven
 }
 
 typedef struct _InsertEventRequestData__isset {
-  _InsertEventRequestData__isset() : filesAddedChecksum(false) {}
+  _InsertEventRequestData__isset() : replace(false), filesAddedChecksum(false) {}
+  bool replace :1;
   bool filesAddedChecksum :1;
 } _InsertEventRequestData__isset;
 
@@ -6564,21 +6565,28 @@ class InsertEventRequestData {
 
   InsertEventRequestData(const InsertEventRequestData&);
   InsertEventRequestData& operator=(const InsertEventRequestData&);
-  InsertEventRequestData() {
+  InsertEventRequestData() : replace(0) {
   }
 
   virtual ~InsertEventRequestData() throw();
+  bool replace;
   std::vector<std::string>  filesAdded;
   std::vector<std::string>  filesAddedChecksum;
 
   _InsertEventRequestData__isset __isset;
 
+  void __set_replace(const bool val);
+
   void __set_filesAdded(const std::vector<std::string> & val);
 
   void __set_filesAddedChecksum(const std::vector<std::string> & val);
 
   bool operator == (const InsertEventRequestData & rhs) const
   {
+    if (__isset.replace != rhs.__isset.replace)
+      return false;
+    else if (__isset.replace && !(replace == rhs.replace))
+      return false;
     if (!(filesAdded == rhs.filesAdded))
       return false;
     if (__isset.filesAddedChecksum != rhs.__isset.filesAddedChecksum)

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
index fd1dc06..354e634 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
@@ -38,8 +38,9 @@ import org.slf4j.LoggerFactory;
 public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEventRequestData, InsertEventRequestData._Fields>, java.io.Serializable, Cloneable, Comparable<InsertEventRequestData> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InsertEventRequestData");
 
-  private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)1);
-  private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)2);
+  private static final org.apache.thrift.protocol.TField REPLACE_FIELD_DESC = new org.apache.thrift.protocol.TField("replace", org.apache.thrift.protocol.TType.BOOL, (short)1);
+  private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)2);
+  private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)3);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -47,13 +48,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
     schemes.put(TupleScheme.class, new InsertEventRequestDataTupleSchemeFactory());
   }
 
+  private boolean replace; // optional
   private List<String> filesAdded; // required
   private List<String> filesAddedChecksum; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    FILES_ADDED((short)1, "filesAdded"),
-    FILES_ADDED_CHECKSUM((short)2, "filesAddedChecksum");
+    REPLACE((short)1, "replace"),
+    FILES_ADDED((short)2, "filesAdded"),
+    FILES_ADDED_CHECKSUM((short)3, "filesAddedChecksum");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -68,9 +71,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
      */
     public static _Fields findByThriftId(int fieldId) {
       switch(fieldId) {
-        case 1: // FILES_ADDED
+        case 1: // REPLACE
+          return REPLACE;
+        case 2: // FILES_ADDED
           return FILES_ADDED;
-        case 2: // FILES_ADDED_CHECKSUM
+        case 3: // FILES_ADDED_CHECKSUM
           return FILES_ADDED_CHECKSUM;
         default:
           return null;
@@ -112,10 +117,14 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.FILES_ADDED_CHECKSUM};
+  private static final int __REPLACE_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.REPLACE,_Fields.FILES_ADDED_CHECKSUM};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.REPLACE, new org.apache.thrift.meta_data.FieldMetaData("replace", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     tmpMap.put(_Fields.FILES_ADDED, new org.apache.thrift.meta_data.FieldMetaData("filesAdded", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
@@ -140,6 +149,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
    * Performs a deep copy on <i>other</i>.
    */
   public InsertEventRequestData(InsertEventRequestData other) {
+    __isset_bitfield = other.__isset_bitfield;
+    this.replace = other.replace;
     if (other.isSetFilesAdded()) {
       List<String> __this__filesAdded = new ArrayList<String>(other.filesAdded);
       this.filesAdded = __this__filesAdded;
@@ -156,10 +167,34 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
 
   @Override
   public void clear() {
+    setReplaceIsSet(false);
+    this.replace = false;
     this.filesAdded = null;
     this.filesAddedChecksum = null;
   }
 
+  public boolean isReplace() {
+    return this.replace;
+  }
+
+  public void setReplace(boolean replace) {
+    this.replace = replace;
+    setReplaceIsSet(true);
+  }
+
+  public void unsetReplace() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __REPLACE_ISSET_ID);
+  }
+
+  /** Returns true if field replace is set (has been assigned a value) and false otherwise */
+  public boolean isSetReplace() {
+    return EncodingUtils.testBit(__isset_bitfield, __REPLACE_ISSET_ID);
+  }
+
+  public void setReplaceIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REPLACE_ISSET_ID, value);
+  }
+
   public int getFilesAddedSize() {
     return (this.filesAdded == null) ? 0 : this.filesAdded.size();
   }
@@ -238,6 +273,14 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
 
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
+    case REPLACE:
+      if (value == null) {
+        unsetReplace();
+      } else {
+        setReplace((Boolean)value);
+      }
+      break;
+
     case FILES_ADDED:
       if (value == null) {
         unsetFilesAdded();
@@ -259,6 +302,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
 
   public Object getFieldValue(_Fields field) {
     switch (field) {
+    case REPLACE:
+      return isReplace();
+
     case FILES_ADDED:
       return getFilesAdded();
 
@@ -276,6 +322,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
     }
 
     switch (field) {
+    case REPLACE:
+      return isSetReplace();
     case FILES_ADDED:
       return isSetFilesAdded();
     case FILES_ADDED_CHECKSUM:
@@ -297,6 +345,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
     if (that == null)
       return false;
 
+    boolean this_present_replace = true && this.isSetReplace();
+    boolean that_present_replace = true && that.isSetReplace();
+    if (this_present_replace || that_present_replace) {
+      if (!(this_present_replace && that_present_replace))
+        return false;
+      if (this.replace != that.replace)
+        return false;
+    }
+
     boolean this_present_filesAdded = true && this.isSetFilesAdded();
     boolean that_present_filesAdded = true && that.isSetFilesAdded();
     if (this_present_filesAdded || that_present_filesAdded) {
@@ -322,6 +379,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
   public int hashCode() {
     List<Object> list = new ArrayList<Object>();
 
+    boolean present_replace = true && (isSetReplace());
+    list.add(present_replace);
+    if (present_replace)
+      list.add(replace);
+
     boolean present_filesAdded = true && (isSetFilesAdded());
     list.add(present_filesAdded);
     if (present_filesAdded)
@@ -343,6 +405,16 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
 
     int lastComparison = 0;
 
+    lastComparison = Boolean.valueOf(isSetReplace()).compareTo(other.isSetReplace());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetReplace()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replace, other.replace);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(isSetFilesAdded()).compareTo(other.isSetFilesAdded());
     if (lastComparison != 0) {
       return lastComparison;
@@ -383,6 +455,12 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
     StringBuilder sb = new StringBuilder("InsertEventRequestData(");
     boolean first = true;
 
+    if (isSetReplace()) {
+      sb.append("replace:");
+      sb.append(this.replace);
+      first = false;
+    }
+    if (!first) sb.append(", ");
     sb.append("filesAdded:");
     if (this.filesAdded == null) {
       sb.append("null");
@@ -423,6 +501,8 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);
@@ -447,7 +527,15 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
           break;
         }
         switch (schemeField.id) {
-          case 1: // FILES_ADDED
+          case 1: // REPLACE
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.replace = iprot.readBool();
+              struct.setReplaceIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // FILES_ADDED
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
                 org.apache.thrift.protocol.TList _list558 = iprot.readListBegin();
@@ -465,7 +553,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
-          case 2: // FILES_ADDED_CHECKSUM
+          case 3: // FILES_ADDED_CHECKSUM
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
                 org.apache.thrift.protocol.TList _list561 = iprot.readListBegin();
@@ -496,6 +584,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       struct.validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.isSetReplace()) {
+        oprot.writeFieldBegin(REPLACE_FIELD_DESC);
+        oprot.writeBool(struct.replace);
+        oprot.writeFieldEnd();
+      }
       if (struct.filesAdded != null) {
         oprot.writeFieldBegin(FILES_ADDED_FIELD_DESC);
         {
@@ -547,10 +640,16 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
         }
       }
       BitSet optionals = new BitSet();
-      if (struct.isSetFilesAddedChecksum()) {
+      if (struct.isSetReplace()) {
         optionals.set(0);
       }
-      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetFilesAddedChecksum()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetReplace()) {
+        oprot.writeBool(struct.replace);
+      }
       if (struct.isSetFilesAddedChecksum()) {
         {
           oprot.writeI32(struct.filesAddedChecksum.size());
@@ -576,8 +675,12 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
         }
       }
       struct.setFilesAddedIsSet(true);
-      BitSet incoming = iprot.readBitSet(1);
+      BitSet incoming = iprot.readBitSet(2);
       if (incoming.get(0)) {
+        struct.replace = iprot.readBool();
+        struct.setReplaceIsSet(true);
+      }
+      if (incoming.get(1)) {
         {
           org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
           struct.filesAddedChecksum = new ArrayList<String>(_list571.size);

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index 2dfa1a9..4a3bf85 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -16007,6 +16007,10 @@ class InsertEventRequestData {
   static $_TSPEC;
 
   /**
+   * @var bool
+   */
+  public $replace = null;
+  /**
    * @var string[]
    */
   public $filesAdded = null;
@@ -16019,6 +16023,10 @@ class InsertEventRequestData {
     if (!isset(self::$_TSPEC)) {
       self::$_TSPEC = array(
         1 => array(
+          'var' => 'replace',
+          'type' => TType::BOOL,
+          ),
+        2 => array(
           'var' => 'filesAdded',
           'type' => TType::LST,
           'etype' => TType::STRING,
@@ -16026,7 +16034,7 @@ class InsertEventRequestData {
             'type' => TType::STRING,
             ),
           ),
-        2 => array(
+        3 => array(
           'var' => 'filesAddedChecksum',
           'type' => TType::LST,
           'etype' => TType::STRING,
@@ -16037,6 +16045,9 @@ class InsertEventRequestData {
         );
     }
     if (is_array($vals)) {
+      if (isset($vals['replace'])) {
+        $this->replace = $vals['replace'];
+      }
       if (isset($vals['filesAdded'])) {
         $this->filesAdded = $vals['filesAdded'];
       }
@@ -16066,6 +16077,13 @@ class InsertEventRequestData {
       switch ($fid)
       {
         case 1:
+          if ($ftype == TType::BOOL) {
+            $xfer += $input->readBool($this->replace);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
           if ($ftype == TType::LST) {
             $this->filesAdded = array();
             $_size495 = 0;
@@ -16082,7 +16100,7 @@ class InsertEventRequestData {
             $xfer += $input->skip($ftype);
           }
           break;
-        case 2:
+        case 3:
           if ($ftype == TType::LST) {
             $this->filesAddedChecksum = array();
             $_size501 = 0;
@@ -16112,11 +16130,16 @@ class InsertEventRequestData {
   public function write($output) {
     $xfer = 0;
     $xfer += $output->writeStructBegin('InsertEventRequestData');
+    if ($this->replace !== null) {
+      $xfer += $output->writeFieldBegin('replace', TType::BOOL, 1);
+      $xfer += $output->writeBool($this->replace);
+      $xfer += $output->writeFieldEnd();
+    }
     if ($this->filesAdded !== null) {
       if (!is_array($this->filesAdded)) {
         throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
       }
-      $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 1);
+      $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 2);
       {
         $output->writeListBegin(TType::STRING, count($this->filesAdded));
         {
@@ -16133,7 +16156,7 @@ class InsertEventRequestData {
       if (!is_array($this->filesAddedChecksum)) {
         throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
       }
-      $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 2);
+      $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 3);
       {
         $output->writeListBegin(TType::STRING, count($this->filesAddedChecksum));
         {

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 3faf1bb..9480c85 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11156,17 +11156,20 @@ class CurrentNotificationEventId:
 class InsertEventRequestData:
   """
   Attributes:
+   - replace
    - filesAdded
    - filesAddedChecksum
   """
 
   thrift_spec = (
     None, # 0
-    (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1
-    (2, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 2
+    (1, TType.BOOL, 'replace', None, None, ), # 1
+    (2, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 2
+    (3, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 3
   )
 
-  def __init__(self, filesAdded=None, filesAddedChecksum=None,):
+  def __init__(self, replace=None, filesAdded=None, filesAddedChecksum=None,):
+    self.replace = replace
     self.filesAdded = filesAdded
     self.filesAddedChecksum = filesAddedChecksum
 
@@ -11180,6 +11183,11 @@ class InsertEventRequestData:
       if ftype == TType.STOP:
         break
       if fid == 1:
+        if ftype == TType.BOOL:
+          self.replace = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
         if ftype == TType.LIST:
           self.filesAdded = []
           (_etype495, _size492) = iprot.readListBegin()
@@ -11189,7 +11197,7 @@ class InsertEventRequestData:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
-      elif fid == 2:
+      elif fid == 3:
         if ftype == TType.LIST:
           self.filesAddedChecksum = []
           (_etype501, _size498) = iprot.readListBegin()
@@ -11209,15 +11217,19 @@ class InsertEventRequestData:
       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
       return
     oprot.writeStructBegin('InsertEventRequestData')
+    if self.replace is not None:
+      oprot.writeFieldBegin('replace', TType.BOOL, 1)
+      oprot.writeBool(self.replace)
+      oprot.writeFieldEnd()
     if self.filesAdded is not None:
-      oprot.writeFieldBegin('filesAdded', TType.LIST, 1)
+      oprot.writeFieldBegin('filesAdded', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.filesAdded))
       for iter504 in self.filesAdded:
         oprot.writeString(iter504)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.filesAddedChecksum is not None:
-      oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 2)
+      oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3)
       oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
       for iter505 in self.filesAddedChecksum:
         oprot.writeString(iter505)
@@ -11234,6 +11246,7 @@ class InsertEventRequestData:
 
   def __hash__(self):
     value = 17
+    value = (value * 31) ^ hash(self.replace)
     value = (value * 31) ^ hash(self.filesAdded)
     value = (value * 31) ^ hash(self.filesAddedChecksum)
     return value

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 5342451..7766071 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2503,10 +2503,12 @@ end
 
 class InsertEventRequestData
   include ::Thrift::Struct, ::Thrift::Struct_Union
-  FILESADDED = 1
-  FILESADDEDCHECKSUM = 2
+  REPLACE = 1
+  FILESADDED = 2
+  FILESADDEDCHECKSUM = 3
 
   FIELDS = {
+    REPLACE => {:type => ::Thrift::Types::BOOL, :name => 'replace', :optional => true},
     FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}},
     FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index 7bc0e04..dff1195 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -38,6 +38,7 @@ public class InsertEvent extends ListenerEvent {
   private final String db;
   private final String table;
   private final Map<String, String> keyValues;
+  private final boolean replace;
   private final List<String> files;
   private List<String> fileChecksums = new ArrayList<String>();
 
@@ -56,6 +57,9 @@ public class InsertEvent extends ListenerEvent {
     super(status, handler);
     this.db = db;
     this.table = table;
+
+    // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+    this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
     this.files = insertData.getFilesAdded();
     GetTableRequest req = new GetTableRequest(db, table);
     req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
@@ -90,6 +94,13 @@ public class InsertEvent extends ListenerEvent {
   }
 
   /**
+   * @return The replace flag.
+   */
+  public boolean isReplace() {
+    return replace;
+  }
+
+  /**
    * Get list of files created as a result of this DML operation
    *
    * @return list of new files

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 3d16721..6d146e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -37,6 +37,12 @@ public abstract class InsertMessage extends EventMessage {
   public abstract String getTable();
 
   /**
+   * Getter for the replace flag being insert into/overwrite
+   * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean).
+   */
+  public abstract boolean isReplace();
+
+  /**
    * Get the map of partition keyvalues.  Will be null if this insert is to a table and not a
    * partition.
    * @return Map of partition keyvalues, or null.

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index aa770f2..1ed7cc5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -231,9 +231,10 @@ public abstract class MessageFactory {
    * @param table Name of the table the insert occurred in
    * @param partVals Partition values for the partition that the insert occurred in, may be null if
    *          the insert was done into a non-partitioned table
+   * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
    * @param files Iterator of file created
    * @return instance of InsertMessage
    */
   public abstract InsertMessage buildInsertMessage(String db, String table,
-      Map<String, String> partVals, Iterator<String> files);
+      Map<String, String> partVals, boolean replace, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index e1316a4..c059d47 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -40,6 +40,9 @@ public class JSONInsertMessage extends InsertMessage {
   Long timestamp;
 
   @JsonProperty
+  String replace;
+
+  @JsonProperty
   List<String> files;
 
   @JsonProperty
@@ -52,12 +55,13 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
-      Map<String, String> partKeyVals, Iterator<String> fileIter, Long timestamp) {
+      Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.db = db;
     this.table = table;
     this.timestamp = timestamp;
+    this.replace = Boolean.toString(replace);
     this.partKeyVals = partKeyVals;
     this.files = Lists.newArrayList(fileIter);
     checkValid();
@@ -99,6 +103,9 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   @Override
+  public boolean isReplace() { return Boolean.parseBoolean(replace); }
+
+  @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 3406afb..bb81949 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -161,10 +161,9 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   @Override
-  public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals,
+  public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
       Iterator<String> fileIter) {
-    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
-        fileIter, now());
+    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
   }
 
   private long now() {

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6deea96..c66bbdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1662,7 +1662,7 @@ public class Hive {
       alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
       validatePartition(newTPart);
       if ((null != newFiles) || replace) {
-        fireInsertEvent(tbl, partSpec, newFiles);
+        fireInsertEvent(tbl, partSpec, replace, newFiles);
       } else {
         LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
       }
@@ -2056,7 +2056,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
 
-    fireInsertEvent(tbl, null, newFiles);
+    fireInsertEvent(tbl, null, replace, newFiles);
   }
 
   /**
@@ -2281,7 +2281,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
         else {
           alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
-          fireInsertEvent(tbl, partSpec, newFiles);
+          fireInsertEvent(tbl, partSpec, true, newFiles);
         }
       }
       if (tpart == null) {
@@ -2331,7 +2331,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     tpart.getSd().setLocation(partPath);
   }
 
-  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
+  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
       throws HiveException {
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
       LOG.debug("Firing dml insert event");
@@ -2343,6 +2343,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
         FireEventRequestData data = new FireEventRequestData();
         InsertEventRequestData insertData = new InsertEventRequestData();
+        insertData.setReplace(replace);
         data.setInsertData(insertData);
         if (newFiles != null && newFiles.size() > 0) {
           for (Path p : newFiles) {

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 245c483..3a1fc70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -413,7 +413,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           x.getOutputs(), addPartitionDesc), x.getConf());
       LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
           Utilities.getTableDesc(table),
-          partSpec.getPartSpec(), true);
+          partSpec.getPartSpec(), replicationSpec.isReplace());
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
           x.getInputs(), x.getOutputs(), loadTableWork, null, false),
@@ -921,7 +921,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
-          loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec, x);
+          loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 3ac7746..a85ba42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -452,7 +452,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         getNewEventOnlyReplicationSpec(ev.getEventId())
     );
     EventHandlerFactory.handlerFor(ev).handle(context);
-
   }
 
   public static void injectNextDumpDirForTest(String dumpdir){
@@ -1223,7 +1222,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   // Use for specifying object state as well as event state
   private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
-    return new ReplicationSpec(true, false, evState, objState, false, true, false);
+    return new ReplicationSpec(true, false, evState, objState, false, true, true);
   }
 
   // Use for replication states focused on event only, where the obj state will be the event state

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 48362a3..1ea608b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -44,7 +44,7 @@ public class ReplicationSpec {
   private String currStateId = null;
   private boolean isNoop = false;
   private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
-  private boolean isInsert = false; // default is that the import mode is replace-into
+  private boolean isReplace = true; // default is that the import mode is insert overwrite
 
   // Key definitions related to replication
   public enum KEY {
@@ -53,7 +53,7 @@ public class ReplicationSpec {
     CURR_STATE_ID("repl.last.id"),
     NOOP("repl.noop"),
     LAZY("repl.lazy"),
-    IS_INSERT("repl.is.insert")
+    IS_REPLACE("repl.is.replace")
     ;
     private final String keyName;
 
@@ -136,14 +136,14 @@ public class ReplicationSpec {
 
   public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
                          String eventReplicationState, String currentReplicationState,
-                         boolean isNoop, boolean isLazy, boolean isInsert) {
+                         boolean isNoop, boolean isLazy, boolean isReplace) {
     this.isInReplicationScope = isInReplicationScope;
     this.isMetadataOnly = isMetadataOnly;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
     this.isNoop = isNoop;
     this.isLazy = isLazy;
-    this.isInsert = isInsert;
+    this.isReplace = isReplace;
   }
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
@@ -162,7 +162,7 @@ public class ReplicationSpec {
     this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
     this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
     this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
-    this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString()));
+    this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
   }
 
   /**
@@ -296,12 +296,12 @@ public class ReplicationSpec {
   }
 
   /**
-   * @return true if this statement refers to insert-into operation.
+   * @return true if this statement refers to insert-into or insert-overwrite operation.
    */
-  public boolean isInsert(){ return isInsert; }
+  public boolean isReplace(){ return isReplace; }
 
-  public void setIsInsert(boolean isInsert){
-    this.isInsert = isInsert;
+  public void setIsReplace(boolean isReplace){
+    this.isReplace = isReplace;
   }
 
   /**
@@ -370,8 +370,8 @@ public class ReplicationSpec {
         return String.valueOf(isNoop());
       case LAZY:
         return String.valueOf(isLazy());
-      case IS_INSERT:
-        return String.valueOf(isInsert());
+      case IS_REPLACE:
+        return String.valueOf(isReplace());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/98250bbe/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
index 1346276..e9f2a6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
@@ -51,16 +51,30 @@ public class InsertHandler extends AbstractHandler {
       qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
     }
     Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-    // Mark the replication type as insert into to avoid overwrite while import
-    withinContext.replicationSpec.setIsInsert(true);
+
+    // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
+    withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
     EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
         qlMdTable, qlPtns,
         withinContext.replicationSpec);
     Iterable<String> files = insertMsg.getFiles();
 
     if (files != null) {
+      Path dataPath;
+      if ((null == qlPtns) || qlPtns.isEmpty()) {
+        dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      } else {
+        /*
+         * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
+         * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
+         * will have only one entry.
+         */
+        assert(1 == qlPtns.size());
+        dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
+      }
+
       // encoded filename/checksum of files, write into _files
-      try (BufferedWriter fileListWriter = writer(withinContext)) {
+      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
         for (String file : files) {
           fileListWriter.write(file + "\n");
         }
@@ -82,8 +96,7 @@ public class InsertHandler extends AbstractHandler {
     );
   }
 
-  private BufferedWriter writer(Context withinContext) throws IOException {
-    Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
     Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
     FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
     return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));