You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/02/13 04:40:58 UTC

hive git commit: HIVE-10562 : Add versioning/format mechanism to NOTIFICATION_LOG entries, expand MESSAGE size (Sushanth Sowmyan, reviewed by Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master 1c49da87f -> c67a6f49d


HIVE-10562 : Add versioning/format mechanism to NOTIFICATION_LOG entries, expand MESSAGE size (Sushanth Sowmyan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c67a6f49
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c67a6f49
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c67a6f49

Branch: refs/heads/master
Commit: c67a6f49d4b0accb5a15da778cc4650ed1878401
Parents: 1c49da8
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Tue Feb 7 11:49:20 2017 -0800
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Sun Feb 12 20:40:47 2017 -0800

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   1 +
 .../hive/ql/TestReplicationScenarios.java       | 120 +++++++++++++++++++
 metastore/if/hive_metastore.thrift              |   1 +
 .../upgrade/derby/038-HIVE-10562.derby.sql      |  11 ++
 .../upgrade/derby/hive-schema-2.2.0.derby.sql   |   2 +-
 .../derby/upgrade-2.1.0-to-2.2.0.derby.sql      |   1 +
 .../upgrade/mssql/023-HIVE-10562.mssql.sql      |   1 +
 .../upgrade/mssql/hive-schema-2.2.0.mssql.sql   |   1 +
 .../mssql/upgrade-2.1.0-to-2.2.0.mssql.sql      |   1 +
 .../upgrade/mysql/038-HIVE-10562.mysql.sql      |   6 +
 .../upgrade/mysql/hive-schema-2.2.0.mysql.sql   |   3 +-
 .../mysql/upgrade-2.1.0-to-2.2.0.mysql.sql      |   1 +
 .../upgrade/oracle/038-HIVE-10562.oracle.sql    |   2 +
 .../upgrade/oracle/hive-schema-2.2.0.oracle.sql |   3 +-
 .../oracle/upgrade-2.1.0-to-2.2.0.oracle.sql    |   1 +
 .../postgres/037-HIVE-10562.postgres.sql        |   1 +
 .../postgres/hive-schema-2.2.0.postgres.sql     |   1 +
 .../upgrade-2.1.0-to-2.2.0.postgres.sql         |   1 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  22 ++++
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |  12 +-
 .../hive/metastore/api/NotificationEvent.java   | 114 +++++++++++++++++-
 .../src/gen/thrift/gen-php/metastore/Types.php  |  23 ++++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  15 ++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   4 +-
 .../hadoop/hive/metastore/ObjectStore.java      |   2 +
 .../hive/metastore/messaging/EventUtils.java    |  28 ++++-
 .../metastore/messaging/MessageFactory.java     |   5 -
 .../messaging/json/JSONMessageFactory.java      |   7 +-
 .../hive/metastore/model/MNotificationLog.java  |   9 ++
 metastore/src/model/package.jdo                 |   3 +
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   9 +-
 31 files changed, 385 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 4df2758..f7e3e3a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -474,6 +474,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
 
   // Process this notification by adding it to metastore DB
   private void process(NotificationEvent event) {
+    event.setMessageFormat(msgFactory.getMessageFormat());
     if (rs != null) {
       synchronized (NOTIFICATION_TBL_LOCK) {
         LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 39834cd..4efc81d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -22,10 +22,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -45,10 +49,12 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -955,6 +961,120 @@ public class TestReplicationScenarios {
 
   }
 
+  @Test
+  public void testEventFilters(){
+    // Test testing that the filters introduced by EventUtils are working correctly.
+
+    // The current filters we use in ReplicationSemanticAnalyzer is as follows:
+    //    IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
+    //        EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern),
+    //        EventUtils.getEventBoundaryFilter(eventFrom, eventTo),
+    //        EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat()));
+    // So, we test each of those three filters, and then test andFilter itself.
+
+
+    String dbname = "testfilter_db";
+    String tblname = "testfilter_tbl";
+
+    // Test EventUtils.getDbTblNotificationFilter - this is supposed to restrict
+    // events to those that match the dbname and tblname provided to the filter.
+    // If the tblname passed in to the filter is null, then it restricts itself
+    // to dbname-matching alone.
+    IMetaStoreClient.NotificationFilter dbTblFilter = EventUtils.getDbTblNotificationFilter(dbname,tblname);
+    IMetaStoreClient.NotificationFilter dbFilter = EventUtils.getDbTblNotificationFilter(dbname,null);
+
+    assertFalse(dbTblFilter.accept(null));
+    assertTrue(dbTblFilter.accept(createDummyEvent(dbname, tblname, 0)));
+    assertFalse(dbTblFilter.accept(createDummyEvent(dbname, tblname + "extra",0)));
+    assertFalse(dbTblFilter.accept(createDummyEvent(dbname + "extra", tblname,0)));
+
+    assertFalse(dbFilter.accept(null));
+    assertTrue(dbFilter.accept(createDummyEvent(dbname, tblname,0)));
+    assertTrue(dbFilter.accept(createDummyEvent(dbname, tblname + "extra", 0)));
+    assertFalse(dbFilter.accept(createDummyEvent(dbname + "extra", tblname,0)));
+
+
+    // Test EventUtils.getEventBoundaryFilter - this is supposed to only allow events
+    // within a range specified.
+    long evBegin = 50;
+    long evEnd = 75;
+    IMetaStoreClient.NotificationFilter evRangeFilter = EventUtils.getEventBoundaryFilter(evBegin,evEnd);
+
+    assertTrue(evBegin < evEnd);
+    assertFalse(evRangeFilter.accept(null));
+    assertFalse(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin - 1)));
+    assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin)));
+    assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin + 1)));
+    assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd - 1)));
+    assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd)));
+    assertFalse(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd + 1)));
+
+
+    // Test EventUtils.restrictByMessageFormat - this restricts events generated to those
+    // that match a provided message format
+
+    IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat =
+        EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat());
+    IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat =
+        EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat() + "_bogus");
+    NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0);
+
+    assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat());
+
+    assertFalse(restrictByDefaultMessageFormat.accept(null));
+    assertTrue(restrictByDefaultMessageFormat.accept(dummyEvent));
+    assertFalse(restrictByArbitraryMessageFormat.accept(dummyEvent));
+
+    // Test andFilter operation.
+
+    IMetaStoreClient.NotificationFilter yes = new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent notificationEvent) {
+        return true;
+      }
+    };
+
+    IMetaStoreClient.NotificationFilter no = new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent notificationEvent) {
+        return false;
+      }
+    };
+
+    assertTrue(EventUtils.andFilter(yes, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(yes, no).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, no).accept(dummyEvent));
+
+    assertTrue(EventUtils.andFilter(yes, yes, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(yes, yes, no).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(yes, no, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(yes, no, no).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, yes, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent));
+    assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent));
+
+
+  }
+
+  private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) {
+    MessageFactory msgFactory = MessageFactory.getInstance();
+    Table t = new Table();
+    t.setDbName(dbname);
+    t.setTableName(tblname);
+    NotificationEvent event = new NotificationEvent(
+        evid,
+        (int)System.currentTimeMillis(),
+        MessageFactory.CREATE_TABLE_EVENT,
+        msgFactory.buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()).toString()
+    );
+    event.setDbName(t.getDbName());
+    event.setTableName(t.getTableName());
+    event.setMessageFormat(msgFactory.getMessageFormat());
+    return event;
+  }
+
   private String verifyAndReturnDbReplStatus(String dbName, String tblName, String prevReplDumpId, String cmd) throws IOException {
     run(cmd);
     advanceDumpDir();

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index bf80455..d056498 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -800,6 +800,7 @@ struct NotificationEvent {
     4: optional string dbName,
     5: optional string tableName,
     6: required string message,
+    7: optional string messageFormat,
 }
 
 struct NotificationEventResponse {

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql b/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql
new file mode 100644
index 0000000..ab33d3a
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql
@@ -0,0 +1,11 @@
+-- Step 1: Add the column for format
+ALTER TABLE "APP"."NOTIFICATION_LOG" ADD "MESSAGE_FORMAT" varchar(16);
+
+-- Step 2 : Change the type of the MESSAGE field from long varchar to clob
+ALTER TABLE "APP"."NOTIFICATION_LOG" ADD COLUMN "MESSAGE_CLOB" CLOB;
+UPDATE "APP"."NOTIFICATION_LOG" SET MESSAGE_CLOB=CAST(MESSAGE AS CLOB);
+ALTER TABLE "APP"."NOTIFICATION_LOG" DROP COLUMN MESSAGE;
+RENAME COLUMN "APP"."NOTIFICATION_LOG"."MESSAGE_CLOB" TO "MESSAGE";
+
+-- ALTER TABLE "APP"."NOTIFICATION_LOG" ALTER COLUMN "MESSAGE" SET DATA TYPE CLOB;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
index fe18089..8a3ae78 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
@@ -102,7 +102,7 @@ CREATE TABLE "APP"."FUNCS" ("FUNC_ID" BIGINT NOT NULL, "CLASS_NAME" VARCHAR(4000
 
 CREATE TABLE "APP"."FUNC_RU" ("FUNC_ID" BIGINT NOT NULL, "RESOURCE_TYPE" INTEGER NOT NULL, "RESOURCE_URI" VARCHAR(4000), "INTEGER_IDX" INTEGER NOT NULL);
 
-CREATE TABLE "APP"."NOTIFICATION_LOG" ("NL_ID" BIGINT NOT NULL, "DB_NAME" VARCHAR(128), "EVENT_ID" BIGINT NOT NULL, "EVENT_TIME" INTEGER NOT NULL, "EVENT_TYPE" VARCHAR(32) NOT NULL, "MESSAGE" LONG VARCHAR, "TBL_NAME" VARCHAR(128));
+CREATE TABLE "APP"."NOTIFICATION_LOG" ("NL_ID" BIGINT NOT NULL, "DB_NAME" VARCHAR(128), "EVENT_ID" BIGINT NOT NULL, "EVENT_TIME" INTEGER NOT NULL, "EVENT_TYPE" VARCHAR(32) NOT NULL, "MESSAGE" CLOB, "TBL_NAME" VARCHAR(128), "MESSAGE_FORMAT" VARCHAR(16));
 
 CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql
index 699a619..e5a144c 100644
--- a/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql
@@ -1,4 +1,5 @@
 -- Upgrade MetaStore schema from 2.1.0 to 2.2.0
 RUN '037-HIVE-14496.derby.sql';
+RUN '038-HIVE-10562.derby.sql';
 
 UPDATE "APP".VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql b/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql
new file mode 100644
index 0000000..fe52a15
--- /dev/null
+++ b/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql
@@ -0,0 +1 @@
+ALTER TABLE NOTIFICATION_LOG ADD MESSAGE_FORMAT nvarchar(16);

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
index 7ff881c..f83911a 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
@@ -579,6 +579,7 @@ CREATE TABLE NOTIFICATION_LOG
     EVENT_TYPE nvarchar(32) NOT NULL,
     DB_NAME nvarchar(128) NULL,
     TBL_NAME nvarchar(128) NULL,
+    MESSAGE_FORMAT nvarchar(16)
     MESSAGE text NULL
 );
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql
index 55d8e9b..a4b8fda 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql
@@ -1,6 +1,7 @@
 SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS MESSAGE;
 
 :r 022-HIVE-14496.mssql.sql
+:r 023-HIVE-10562.mssql.sql
 
 UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS MESSAGE;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql b/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql
new file mode 100644
index 0000000..51e73ba
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql
@@ -0,0 +1,6 @@
+-- Step 1: Add the column for format
+ALTER TABLE `NOTIFICATION_LOG` ADD `MESSAGE_FORMAT` varchar(16);
+-- if MESSAGE_FORMAT is null, then it is the legacy hcat JSONMessageFactory that created this message
+
+-- Step 2 : Change the type of the MESSAGE field from mediumtext to longtext
+ALTER TABLE `NOTIFICATION_LOG` MODIFY `MESSAGE` longtext;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
index 2009f1f..f0b9f10 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
@@ -798,7 +798,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
     `EVENT_TYPE` varchar(32) NOT NULL,
     `DB_NAME` varchar(128),
     `TBL_NAME` varchar(128),
-    `MESSAGE` mediumtext,
+    `MESSAGE` longtext,
+    `MESSAGE_FORMAT` varchar(16)
     PRIMARY KEY (`NL_ID`)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
index 07a002f..509c532 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
@@ -1,6 +1,7 @@
 SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS ' ';
 
 SOURCE 037-HIVE-14496.mysql.sql;
+SOURCE 038-HIVE-10562.mysql.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS ' ';

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql b/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql
new file mode 100644
index 0000000..bdf7eb8
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql
@@ -0,0 +1,2 @@
+ALTER TABLE NOTIFICATION_LOG ADD MESSAGE_FORMAT VARCHAR(16) NULL;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
index bb5a934..1d652a8 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
@@ -560,7 +560,8 @@ CREATE TABLE NOTIFICATION_LOG
     EVENT_TYPE VARCHAR2(32) NOT NULL,
     DB_NAME VARCHAR2(128),
     TBL_NAME VARCHAR2(128),
-    MESSAGE CLOB NULL
+    MESSAGE CLOB NULL,
+    MESSAGE_FORMAT VARCHAR(16) NULL
 );
 
 ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID);

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql
index b5e65b9..f31fda9 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql
@@ -1,6 +1,7 @@
 SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS Status from dual;
 
 @037-HIVE-14496.oracle.sql;
+@038-HIVE-10562.oracle.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS Status from dual;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql b/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql
new file mode 100644
index 0000000..7189f21
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql
@@ -0,0 +1 @@
+ALTER TABLE "NOTIFICATION_LOG" ADD COLUMN "MESSAGE_FORMAT" VARCHAR(16) NULL;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
index 0021df0..81deaa2 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
@@ -576,6 +576,7 @@ CREATE TABLE "NOTIFICATION_LOG"
     "DB_NAME" VARCHAR(128),
     "TBL_NAME" VARCHAR(128),
     "MESSAGE" text,
+    "MESSAGE_FORMAT" VARCHAR(16),
     PRIMARY KEY ("NL_ID")
 );
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql
index 0f7139a..0f64a90 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql
@@ -1,6 +1,7 @@
 SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0';
 
 \i 036-HIVE-14496.postgres.sql;
+\i 037-HIVE-10562.postgres.sql;
 
 UPDATE "VERSION" SET "SCHEMA_VERSION"='2.2.0', "VERSION_COMMENT"='Hive release version 2.2.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0';

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index d605049..be8429e 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -15736,6 +15736,11 @@ void NotificationEvent::__set_message(const std::string& val) {
   this->message = val;
 }
 
+void NotificationEvent::__set_messageFormat(const std::string& val) {
+  this->messageFormat = val;
+__isset.messageFormat = true;
+}
+
 uint32_t NotificationEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
 
   apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -15809,6 +15814,14 @@ uint32_t NotificationEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
           xfer += iprot->skip(ftype);
         }
         break;
+      case 7:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->messageFormat);
+          this->__isset.messageFormat = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -15860,6 +15873,11 @@ uint32_t NotificationEvent::write(::apache::thrift::protocol::TProtocol* oprot)
   xfer += oprot->writeString(this->message);
   xfer += oprot->writeFieldEnd();
 
+  if (this->__isset.messageFormat) {
+    xfer += oprot->writeFieldBegin("messageFormat", ::apache::thrift::protocol::T_STRING, 7);
+    xfer += oprot->writeString(this->messageFormat);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -15873,6 +15891,7 @@ void swap(NotificationEvent &a, NotificationEvent &b) {
   swap(a.dbName, b.dbName);
   swap(a.tableName, b.tableName);
   swap(a.message, b.message);
+  swap(a.messageFormat, b.messageFormat);
   swap(a.__isset, b.__isset);
 }
 
@@ -15883,6 +15902,7 @@ NotificationEvent::NotificationEvent(const NotificationEvent& other639) {
   dbName = other639.dbName;
   tableName = other639.tableName;
   message = other639.message;
+  messageFormat = other639.messageFormat;
   __isset = other639.__isset;
 }
 NotificationEvent& NotificationEvent::operator=(const NotificationEvent& other640) {
@@ -15892,6 +15912,7 @@ NotificationEvent& NotificationEvent::operator=(const NotificationEvent& other64
   dbName = other640.dbName;
   tableName = other640.tableName;
   message = other640.message;
+  messageFormat = other640.messageFormat;
   __isset = other640.__isset;
   return *this;
 }
@@ -15904,6 +15925,7 @@ void NotificationEvent::printTo(std::ostream& out) const {
   out << ", " << "dbName="; (__isset.dbName ? (out << to_string(dbName)) : (out << "<null>"));
   out << ", " << "tableName="; (__isset.tableName ? (out << to_string(tableName)) : (out << "<null>"));
   out << ", " << "message=" << to_string(message);
+  out << ", " << "messageFormat="; (__isset.messageFormat ? (out << to_string(messageFormat)) : (out << "<null>"));
   out << ")";
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 4d5da71..e73333a 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -6391,9 +6391,10 @@ inline std::ostream& operator<<(std::ostream& out, const NotificationEventReques
 }
 
 typedef struct _NotificationEvent__isset {
-  _NotificationEvent__isset() : dbName(false), tableName(false) {}
+  _NotificationEvent__isset() : dbName(false), tableName(false), messageFormat(false) {}
   bool dbName :1;
   bool tableName :1;
+  bool messageFormat :1;
 } _NotificationEvent__isset;
 
 class NotificationEvent {
@@ -6401,7 +6402,7 @@ class NotificationEvent {
 
   NotificationEvent(const NotificationEvent&);
   NotificationEvent& operator=(const NotificationEvent&);
-  NotificationEvent() : eventId(0), eventTime(0), eventType(), dbName(), tableName(), message() {
+  NotificationEvent() : eventId(0), eventTime(0), eventType(), dbName(), tableName(), message(), messageFormat() {
   }
 
   virtual ~NotificationEvent() throw();
@@ -6411,6 +6412,7 @@ class NotificationEvent {
   std::string dbName;
   std::string tableName;
   std::string message;
+  std::string messageFormat;
 
   _NotificationEvent__isset __isset;
 
@@ -6426,6 +6428,8 @@ class NotificationEvent {
 
   void __set_message(const std::string& val);
 
+  void __set_messageFormat(const std::string& val);
+
   bool operator == (const NotificationEvent & rhs) const
   {
     if (!(eventId == rhs.eventId))
@@ -6444,6 +6448,10 @@ class NotificationEvent {
       return false;
     if (!(message == rhs.message))
       return false;
+    if (__isset.messageFormat != rhs.__isset.messageFormat)
+      return false;
+    else if (__isset.messageFormat && !(messageFormat == rhs.messageFormat))
+      return false;
     return true;
   }
   bool operator != (const NotificationEvent &rhs) const {

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
index c40bb4b..8e0fb40 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
@@ -44,6 +44,7 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
   private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)4);
   private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tableName", org.apache.thrift.protocol.TType.STRING, (short)5);
   private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)6);
+  private static final org.apache.thrift.protocol.TField MESSAGE_FORMAT_FIELD_DESC = new org.apache.thrift.protocol.TField("messageFormat", org.apache.thrift.protocol.TType.STRING, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -57,6 +58,7 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
   private String dbName; // optional
   private String tableName; // optional
   private String message; // required
+  private String messageFormat; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -65,7 +67,8 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     EVENT_TYPE((short)3, "eventType"),
     DB_NAME((short)4, "dbName"),
     TABLE_NAME((short)5, "tableName"),
-    MESSAGE((short)6, "message");
+    MESSAGE((short)6, "message"),
+    MESSAGE_FORMAT((short)7, "messageFormat");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -92,6 +95,8 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
           return TABLE_NAME;
         case 6: // MESSAGE
           return MESSAGE;
+        case 7: // MESSAGE_FORMAT
+          return MESSAGE_FORMAT;
         default:
           return null;
       }
@@ -135,7 +140,7 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
   private static final int __EVENTID_ISSET_ID = 0;
   private static final int __EVENTTIME_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.DB_NAME,_Fields.TABLE_NAME};
+  private static final _Fields optionals[] = {_Fields.DB_NAME,_Fields.TABLE_NAME,_Fields.MESSAGE_FORMAT};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -151,6 +156,8 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.MESSAGE_FORMAT, new org.apache.thrift.meta_data.FieldMetaData("messageFormat", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEvent.class, metaDataMap);
   }
@@ -192,6 +199,9 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     if (other.isSetMessage()) {
       this.message = other.message;
     }
+    if (other.isSetMessageFormat()) {
+      this.messageFormat = other.messageFormat;
+    }
   }
 
   public NotificationEvent deepCopy() {
@@ -208,6 +218,7 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     this.dbName = null;
     this.tableName = null;
     this.message = null;
+    this.messageFormat = null;
   }
 
   public long getEventId() {
@@ -346,6 +357,29 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     }
   }
 
+  public String getMessageFormat() {
+    return this.messageFormat;
+  }
+
+  public void setMessageFormat(String messageFormat) {
+    this.messageFormat = messageFormat;
+  }
+
+  public void unsetMessageFormat() {
+    this.messageFormat = null;
+  }
+
+  /** Returns true if field messageFormat is set (has been assigned a value) and false otherwise */
+  public boolean isSetMessageFormat() {
+    return this.messageFormat != null;
+  }
+
+  public void setMessageFormatIsSet(boolean value) {
+    if (!value) {
+      this.messageFormat = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case EVENT_ID:
@@ -396,6 +430,14 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
       }
       break;
 
+    case MESSAGE_FORMAT:
+      if (value == null) {
+        unsetMessageFormat();
+      } else {
+        setMessageFormat((String)value);
+      }
+      break;
+
     }
   }
 
@@ -419,6 +461,9 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     case MESSAGE:
       return getMessage();
 
+    case MESSAGE_FORMAT:
+      return getMessageFormat();
+
     }
     throw new IllegalStateException();
   }
@@ -442,6 +487,8 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
       return isSetTableName();
     case MESSAGE:
       return isSetMessage();
+    case MESSAGE_FORMAT:
+      return isSetMessageFormat();
     }
     throw new IllegalStateException();
   }
@@ -513,6 +560,15 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
         return false;
     }
 
+    boolean this_present_messageFormat = true && this.isSetMessageFormat();
+    boolean that_present_messageFormat = true && that.isSetMessageFormat();
+    if (this_present_messageFormat || that_present_messageFormat) {
+      if (!(this_present_messageFormat && that_present_messageFormat))
+        return false;
+      if (!this.messageFormat.equals(that.messageFormat))
+        return false;
+    }
+
     return true;
   }
 
@@ -550,6 +606,11 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
     if (present_message)
       list.add(message);
 
+    boolean present_messageFormat = true && (isSetMessageFormat());
+    list.add(present_messageFormat);
+    if (present_messageFormat)
+      list.add(messageFormat);
+
     return list.hashCode();
   }
 
@@ -621,6 +682,16 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetMessageFormat()).compareTo(other.isSetMessageFormat());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMessageFormat()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messageFormat, other.messageFormat);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -684,6 +755,16 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
       sb.append(this.message);
     }
     first = false;
+    if (isSetMessageFormat()) {
+      if (!first) sb.append(", ");
+      sb.append("messageFormat:");
+      if (this.messageFormat == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.messageFormat);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -793,6 +874,14 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // MESSAGE_FORMAT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.messageFormat = iprot.readString();
+              struct.setMessageFormatIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -836,6 +925,13 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
         oprot.writeString(struct.message);
         oprot.writeFieldEnd();
       }
+      if (struct.messageFormat != null) {
+        if (struct.isSetMessageFormat()) {
+          oprot.writeFieldBegin(MESSAGE_FORMAT_FIELD_DESC);
+          oprot.writeString(struct.messageFormat);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -864,13 +960,19 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
       if (struct.isSetTableName()) {
         optionals.set(1);
       }
-      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetMessageFormat()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
       if (struct.isSetDbName()) {
         oprot.writeString(struct.dbName);
       }
       if (struct.isSetTableName()) {
         oprot.writeString(struct.tableName);
       }
+      if (struct.isSetMessageFormat()) {
+        oprot.writeString(struct.messageFormat);
+      }
     }
 
     @Override
@@ -884,7 +986,7 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
       struct.setEventTypeIsSet(true);
       struct.message = iprot.readString();
       struct.setMessageIsSet(true);
-      BitSet incoming = iprot.readBitSet(2);
+      BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         struct.dbName = iprot.readString();
         struct.setDbNameIsSet(true);
@@ -893,6 +995,10 @@ public class NotificationEvent implements org.apache.thrift.TBase<NotificationEv
         struct.tableName = iprot.readString();
         struct.setTableNameIsSet(true);
       }
+      if (incoming.get(2)) {
+        struct.messageFormat = iprot.readString();
+        struct.setMessageFormatIsSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index 103cd86..2dfa1a9 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -15639,6 +15639,10 @@ class NotificationEvent {
    * @var string
    */
   public $message = null;
+  /**
+   * @var string
+   */
+  public $messageFormat = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -15667,6 +15671,10 @@ class NotificationEvent {
           'var' => 'message',
           'type' => TType::STRING,
           ),
+        7 => array(
+          'var' => 'messageFormat',
+          'type' => TType::STRING,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -15688,6 +15696,9 @@ class NotificationEvent {
       if (isset($vals['message'])) {
         $this->message = $vals['message'];
       }
+      if (isset($vals['messageFormat'])) {
+        $this->messageFormat = $vals['messageFormat'];
+      }
     }
   }
 
@@ -15752,6 +15763,13 @@ class NotificationEvent {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 7:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->messageFormat);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -15795,6 +15813,11 @@ class NotificationEvent {
       $xfer += $output->writeString($this->message);
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->messageFormat !== null) {
+      $xfer += $output->writeFieldBegin('messageFormat', TType::STRING, 7);
+      $xfer += $output->writeString($this->messageFormat);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 2f1c3cf..3faf1bb 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -10868,6 +10868,7 @@ class NotificationEvent:
    - dbName
    - tableName
    - message
+   - messageFormat
   """
 
   thrift_spec = (
@@ -10878,15 +10879,17 @@ class NotificationEvent:
     (4, TType.STRING, 'dbName', None, None, ), # 4
     (5, TType.STRING, 'tableName', None, None, ), # 5
     (6, TType.STRING, 'message', None, None, ), # 6
+    (7, TType.STRING, 'messageFormat', None, None, ), # 7
   )
 
-  def __init__(self, eventId=None, eventTime=None, eventType=None, dbName=None, tableName=None, message=None,):
+  def __init__(self, eventId=None, eventTime=None, eventType=None, dbName=None, tableName=None, message=None, messageFormat=None,):
     self.eventId = eventId
     self.eventTime = eventTime
     self.eventType = eventType
     self.dbName = dbName
     self.tableName = tableName
     self.message = message
+    self.messageFormat = messageFormat
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -10927,6 +10930,11 @@ class NotificationEvent:
           self.message = iprot.readString()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.messageFormat = iprot.readString()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -10961,6 +10969,10 @@ class NotificationEvent:
       oprot.writeFieldBegin('message', TType.STRING, 6)
       oprot.writeString(self.message)
       oprot.writeFieldEnd()
+    if self.messageFormat is not None:
+      oprot.writeFieldBegin('messageFormat', TType.STRING, 7)
+      oprot.writeString(self.messageFormat)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -10984,6 +10996,7 @@ class NotificationEvent:
     value = (value * 31) ^ hash(self.dbName)
     value = (value * 31) ^ hash(self.tableName)
     value = (value * 31) ^ hash(self.message)
+    value = (value * 31) ^ hash(self.messageFormat)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index b6050c6..5342451 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2443,6 +2443,7 @@ class NotificationEvent
   DBNAME = 4
   TABLENAME = 5
   MESSAGE = 6
+  MESSAGEFORMAT = 7
 
   FIELDS = {
     EVENTID => {:type => ::Thrift::Types::I64, :name => 'eventId'},
@@ -2450,7 +2451,8 @@ class NotificationEvent
     EVENTTYPE => {:type => ::Thrift::Types::STRING, :name => 'eventType'},
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName', :optional => true},
     TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName', :optional => true},
-    MESSAGE => {:type => ::Thrift::Types::STRING, :name => 'message'}
+    MESSAGE => {:type => ::Thrift::Types::STRING, :name => 'message'},
+    MESSAGEFORMAT => {:type => ::Thrift::Types::STRING, :name => 'messageFormat', :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 778615d..c3f2e99 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -8387,6 +8387,7 @@ public class ObjectStore implements RawStore, Configurable {
     dbEntry.setDbName(entry.getDbName());
     dbEntry.setTableName(entry.getTableName());
     dbEntry.setMessage(entry.getMessage());
+    dbEntry.setMessageFormat(entry.getMessageFormat());
     return dbEntry;
   }
 
@@ -8398,6 +8399,7 @@ public class ObjectStore implements RawStore, Configurable {
     event.setDbName(dbEvent.getDbName());
     event.setTableName(dbEvent.getTableName());
     event.setMessage((dbEvent.getMessage()));
+    event.setMessageFormat(dbEvent.getMessageFormat());
     return event;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index 927bf15..a5414d1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -63,6 +63,24 @@ public class EventUtils {
     };
   }
 
+  public static IMetaStoreClient.NotificationFilter restrictByMessageFormat(final String messageFormat){
+    return new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        if (event == null){
+          return false; // get rid of trivial case first, so that we can safely assume non-null
+        }
+        if (messageFormat == null){
+          return true; // let's say that passing null in will not do any filtering.
+        }
+        if (messageFormat.equalsIgnoreCase(event.getMessageFormat())){
+          return true;
+        }
+        return false;
+      }
+    };
+  }
+
   public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){
     return new IMetaStoreClient.NotificationFilter() {
       @Override
@@ -76,12 +94,16 @@ public class EventUtils {
   }
 
   public static IMetaStoreClient.NotificationFilter andFilter(
-      final IMetaStoreClient.NotificationFilter filter1,
-      final IMetaStoreClient.NotificationFilter filter2) {
+      final IMetaStoreClient.NotificationFilter... filters ) {
     return new IMetaStoreClient.NotificationFilter() {
       @Override
       public boolean accept(NotificationEvent event) {
-        return filter1.accept(event) && filter2.accept(event);
+        for (IMetaStoreClient.NotificationFilter filter : filters){
+          if (!filter.accept(event)){
+            return false;
+          }
+        }
+        return true;
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index c632ca4..aa770f2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -116,11 +116,6 @@ public abstract class MessageFactory {
   public abstract MessageDeserializer getDeserializer();
 
   /**
-   * Getter for version-string, corresponding to all constructed messages.
-   */
-  public abstract String getVersion();
-
-  /**
    * Getter for message-format.
    */
   public abstract String getMessageFormat();

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index a6ae8de..3406afb 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -84,13 +84,8 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   @Override
-  public String getVersion() {
-    return "0.1";
-  }
-
-  @Override
   public String getMessageFormat() {
-    return "json";
+    return "json-0.2";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
----------------------------------------------------------------------
diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
index aedac35..d3a166f 100644
--- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
+++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
@@ -25,6 +25,7 @@ public class MNotificationLog {
   private String dbName;
   private String tableName;
   private String message;
+  private String messageFormat;
 
   public MNotificationLog() {
   }
@@ -86,4 +87,12 @@ public class MNotificationLog {
   public void setMessage(String message) {
     this.message = message;
   }
+
+  public String getMessageFormat() {
+    return messageFormat;
+  }
+
+  public void setMessageFormat(String messageFormat) {
+    this.messageFormat = messageFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index daee72c..e7d26fb 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -1049,6 +1049,9 @@
       <field name="message">
         <column name="MESSAGE" jdbc-type="LONGVARCHAR"/>
       </field>
+      <field name="messageFormat">
+        <column name="MESSAGE_FORMAT" length="16" jdbc-type="VARCHAR" allows-null="true"/>
+      </field>
     </class>
 
     <!-- I tried to use a sequence here but derby didn't handle it well. -->

http://git-wip-us.apache.org/repos/asf/hive/blob/c67a6f49/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 2b327db..37baca1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -377,9 +377,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           }
         }
 
+        // TODO : instead of simply restricting by message format, we should eventually
+        // move to a jdbc-driver-stype registering of message format, and picking message
+        // factory per event to decode. For now, however, since all messages have the
+        // same factory, restricting by message format is effectively a guard against
+        // older leftover data that would cause us problems.
+
         IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
             EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern),
-            EventUtils.getEventBoundaryFilter(eventFrom, eventTo));
+            EventUtils.getEventBoundaryFilter(eventFrom, eventTo),
+            EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat()));
 
         EventUtils.MSClientNotificationFetcher evFetcher
             = new EventUtils.MSClientNotificationFetcher(db.getMSC());