You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:05 UTC
[55/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive
might create large messages for DBNotification (Anishek Agarwal,
reviewed by Sankar Hariappan)
HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b4302bb7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b4302bb7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b4302bb7
Branch: refs/heads/master-tez092
Commit: b4302bb7ad967f15ca1b708685b2ac669e3cf037
Parents: b829955
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Oct 22 13:51:43 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Oct 22 13:51:43 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../listener/DbNotificationListener.java | 182 +++++---
.../json/JSONCreateFunctionMessage.java | 3 +-
.../messaging/json/JSONDropFunctionMessage.java | 3 +-
.../messaging/json/JSONMessageFactory.java | 39 +-
.../listener/TestDbNotificationListener.java | 14 +-
.../TestReplAcidTablesWithJsonMessage.java | 43 ++
...eplAcrossInstancesWithJsonMessageFormat.java | 45 ++
...ncrementalLoadAcidTablesWithJsonMessage.java | 46 ++
.../ql/parse/TestReplWithJsonMessageFormat.java | 39 ++
.../hive/ql/parse/TestReplicationScenarios.java | 82 ++--
.../TestReplicationScenariosAcidTables.java | 61 +--
...TestReplicationScenariosAcrossInstances.java | 103 +++--
...ationScenariosIncrementalLoadAcidTables.java | 55 ++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 2 +-
.../ql/cache/results/QueryResultsCache.java | 14 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +-
.../repl/bootstrap/load/LoadConstraint.java | 4 +-
.../parse/repl/dump/events/AbortTxnHandler.java | 12 +-
.../events/AbstractConstraintEventHandler.java | 3 +-
.../repl/dump/events/AbstractEventHandler.java | 32 +-
.../repl/dump/events/AddForeignKeyHandler.java | 12 +-
.../events/AddNotNullConstraintHandler.java | 12 +-
.../repl/dump/events/AddPartitionHandler.java | 10 +-
.../repl/dump/events/AddPrimaryKeyHandler.java | 12 +-
.../dump/events/AddUniqueConstraintHandler.java | 13 +-
.../repl/dump/events/AllocWriteIdHandler.java | 12 +-
.../repl/dump/events/AlterDatabaseHandler.java | 12 +-
.../repl/dump/events/AlterPartitionHandler.java | 21 +-
.../repl/dump/events/AlterTableHandler.java | 18 +-
.../repl/dump/events/CommitTxnHandler.java | 28 +-
.../repl/dump/events/CreateDatabaseHandler.java | 13 +-
.../repl/dump/events/CreateFunctionHandler.java | 13 +-
.../repl/dump/events/CreateTableHandler.java | 15 +-
.../parse/repl/dump/events/DefaultHandler.java | 9 +
.../repl/dump/events/DropConstraintHandler.java | 13 +-
.../repl/dump/events/DropDatabaseHandler.java | 12 +-
.../repl/dump/events/DropFunctionHandler.java | 12 +-
.../repl/dump/events/DropPartitionHandler.java | 12 +-
.../repl/dump/events/DropTableHandler.java | 12 +-
.../repl/dump/events/EventHandlerFactory.java | 44 +-
.../parse/repl/dump/events/InsertHandler.java | 22 +-
.../parse/repl/dump/events/OpenTxnHandler.java | 12 +-
.../repl/dump/io/ConstraintsSerializer.java | 10 +-
.../load/message/AbstractMessageHandler.java | 4 +-
.../dump/events/TestEventHandlerFactory.java | 7 +-
.../hive/metastore/conf/MetastoreConf.java | 2 +-
.../hive/metastore/messaging/EventMessage.java | 64 +--
.../metastore/messaging/MessageBuilder.java | 425 ++++++++++++++++++
.../metastore/messaging/MessageEncoder.java | 27 ++
.../metastore/messaging/MessageFactory.java | 367 +++-------------
.../metastore/messaging/MessageSerializer.java | 24 ++
.../event/filters/DatabaseAndTableFilter.java | 8 +-
.../messaging/json/JSONAcidWriteMessage.java | 9 +-
.../json/JSONAddForeignKeyMessage.java | 5 +-
.../json/JSONAddNotNullConstraintMessage.java | 5 +-
.../messaging/json/JSONAddPartitionMessage.java | 11 +-
.../json/JSONAddPrimaryKeyMessage.java | 5 +-
.../json/JSONAddUniqueConstraintMessage.java | 5 +-
.../messaging/json/JSONAlterCatalogMessage.java | 9 +-
.../json/JSONAlterDatabaseMessage.java | 9 +-
.../json/JSONAlterPartitionMessage.java | 15 +-
.../messaging/json/JSONAlterTableMessage.java | 9 +-
.../messaging/json/JSONCommitTxnMessage.java | 5 +-
.../json/JSONCreateDatabaseMessage.java | 5 +-
.../json/JSONCreateFunctionMessage.java | 5 +-
.../messaging/json/JSONCreateTableMessage.java | 5 +-
.../json/JSONDropPartitionMessage.java | 5 +-
.../messaging/json/JSONDropTableMessage.java | 5 +-
.../messaging/json/JSONInsertMessage.java | 9 +-
.../messaging/json/JSONMessageEncoder.java | 70 +++
.../messaging/json/JSONMessageFactory.java | 432 -------------------
.../messaging/json/gzip/DeSerializer.java | 181 ++++++++
.../json/gzip/GzipJSONMessageEncoder.java | 68 +++
.../messaging/json/gzip/Serializer.java | 32 ++
.../hive/metastore/MetaStoreTestUtils.java | 11 +
.../ptest2/conf/deployed/master-mr2.properties | 2 +-
77 files changed, 1781 insertions(+), 1202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bcf1e9e..ed6d3d8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1176,7 +1176,7 @@ public class HiveConf extends Configuration {
*/
@Deprecated
METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory",
- "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+ "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder",
"Factory class for making encoding and decoding messages in the events generated."),
/**
* @deprecated Use MetastoreConf.EXECUTE_SET_UGI
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 4313e12..c23aab2 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -78,9 +78,32 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
@@ -110,7 +133,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
private static CleanerThread cleaner = null;
private Configuration conf;
- private MessageFactory msgFactory;
+ private MessageEncoder msgEncoder;
//cleaner is a static object, use static synchronized to make sure its thread-safe
private static synchronized void init(Configuration conf) throws MetaException {
@@ -126,7 +149,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
super(config);
conf = config;
DbNotificationListener.init(conf);
- msgFactory = MessageFactory.getInstance();
+ msgEncoder = MessageFactory.getDefaultInstance(conf);
}
/**
@@ -172,9 +195,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
Table t = tableEvent.getTable();
FileIterator fileIter = MetaStoreUtils.isExternalTable(t)
? null : new FileIterator(t.getSd().getLocation());
+ CreateTableMessage msg =
+ MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter);
NotificationEvent event =
new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(),
- msgFactory.buildCreateTableMessage(t, fileIter).toString());
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
@@ -188,9 +213,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
Table t = tableEvent.getTable();
+ DropTableMessage msg = MessageBuilder.getInstance().buildDropTableMessage(t);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory
- .buildDropTableMessage(t).toString());
+ new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
@@ -205,9 +231,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
Table before = tableEvent.getOldTable();
Table after = tableEvent.getNewTable();
+ AlterTableMessage msg = MessageBuilder.getInstance()
+ .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(),
+ tableEvent.getWriteId());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
- .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), tableEvent.getWriteId()).toString());
+ new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(),
+ msgEncoder.getSerializer().serialize(msg)
+ );
event.setCatName(after.isSetCatName() ? after.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
@@ -320,10 +350,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
Table t = partitionEvent.getTable();
PartitionFilesIterator fileIter = MetaStoreUtils.isExternalTable(t)
? null : new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t);
- String msg = msgFactory
- .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter).toString();
- NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
+ EventMessage msg = MessageBuilder.getInstance()
+ .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter);
+ MessageSerializer serializer = msgEncoder.getSerializer();
+
+ NotificationEvent event = new NotificationEvent(0, now(),
+ EventType.ADD_PARTITION.toString(), serializer.serialize(msg));
event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
@@ -337,9 +369,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
Table t = partitionEvent.getTable();
- NotificationEvent event =
- new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory
- .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+ DropPartitionMessage msg =
+ MessageBuilder.getInstance()
+ .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator());
+ NotificationEvent event = new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
@@ -354,10 +388,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
Partition before = partitionEvent.getOldPartition();
Partition after = partitionEvent.getNewPartition();
+ AlterPartitionMessage msg = MessageBuilder.getInstance()
+ .buildAlterPartitionMessage(partitionEvent.getTable(), before, after,
+ partitionEvent.getIsTruncateOp(),
+ partitionEvent.getWriteId());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
- .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(),
- partitionEvent.getWriteId()).toString());
+ new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(before.isSetCatName() ? before.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
@@ -371,9 +408,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
+ CreateDatabaseMessage msg = MessageBuilder.getInstance()
+ .buildCreateDatabaseMessage(db);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
- .buildCreateDatabaseMessage(db).toString());
+ new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME);
event.setDbName(db.getName());
process(event, dbEvent);
@@ -386,9 +425,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
+ DropDatabaseMessage msg = MessageBuilder.getInstance()
+ .buildDropDatabaseMessage(db);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
- .buildDropDatabaseMessage(db).toString());
+ new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME);
event.setDbName(db.getName());
process(event, dbEvent);
@@ -402,9 +443,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
Database oldDb = dbEvent.getOldDatabase();
Database newDb = dbEvent.getNewDatabase();
+ AlterDatabaseMessage msg = MessageBuilder.getInstance()
+ .buildAlterDatabaseMessage(oldDb, newDb);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(), msgFactory
- .buildAlterDatabaseMessage(oldDb, newDb).toString());
+ new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(),
+ msgEncoder.getSerializer().serialize(msg)
+ );
event.setCatName(oldDb.isSetCatalogName() ? oldDb.getCatalogName() : DEFAULT_CATALOG_NAME);
event.setDbName(oldDb.getName());
process(event, dbEvent);
@@ -417,9 +461,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
+ CreateFunctionMessage msg = MessageBuilder.getInstance()
+ .buildCreateFunctionMessage(fn);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
- .buildCreateFunctionMessage(fn).toString());
+ new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(fn.getDbName());
process(event, fnEvent);
@@ -432,9 +478,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
+ DropFunctionMessage msg = MessageBuilder.getInstance().buildDropFunctionMessage(fn);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
- .buildDropFunctionMessage(fn).toString());
+ new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(fn.getDbName());
process(event, fnEvent);
@@ -481,11 +528,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onInsert(InsertEvent insertEvent) throws MetaException {
Table tableObj = insertEvent.getTableObj();
+ InsertMessage msg = MessageBuilder.getInstance().buildInsertMessage(tableObj,
+ insertEvent.getPartitionObj(), insertEvent.isReplace(),
+ new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()));
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj,
- insertEvent.getPartitionObj(), insertEvent.isReplace(),
- new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
- .toString());
+ new NotificationEvent(0, now(), EventType.INSERT.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(tableObj.isSetCatName() ? tableObj.getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(tableObj.getDbName());
event.setTableName(tableObj.getTableName());
@@ -495,10 +543,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
- OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
+ OpenTxnMessage msg =
+ MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
openTxnEvent.getTxnIds().get(lastTxnIdx));
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString());
+ new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(),
+ msgEncoder.getSerializer().serialize(msg));
try {
addNotificationLog(event, openTxnEvent, dbConn, sqlGenerator);
@@ -510,10 +560,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
throws MetaException {
+ CommitTxnMessage msg =
+ MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
+
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage(
- commitTxnEvent.getTxnId())
- .toString());
+ new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
+ msgEncoder.getSerializer().serialize(msg));
try {
addNotificationLog(event, commitTxnEvent, dbConn, sqlGenerator);
@@ -525,10 +577,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
throws MetaException {
+ AbortTxnMessage msg =
+ MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage(
- abortTxnEvent.getTxnId())
- .toString());
+ new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(),
+ msgEncoder.getSerializer().serialize(msg));
try {
addNotificationLog(event, abortTxnEvent, dbConn, sqlGenerator);
@@ -555,9 +608,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAddPrimaryKey(AddPrimaryKeyEvent addPrimaryKeyEvent) throws MetaException {
List<SQLPrimaryKey> cols = addPrimaryKeyEvent.getPrimaryKeyCols();
if (cols.size() > 0) {
- NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(), msgFactory
- .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols()).toString());
+ AddPrimaryKeyMessage msg = MessageBuilder.getInstance()
+ .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols());
+ NotificationEvent event = new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(cols.get(0).getTable_db());
event.setTableName(cols.get(0).getTable_name());
@@ -573,9 +627,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAddForeignKey(AddForeignKeyEvent addForeignKeyEvent) throws MetaException {
List<SQLForeignKey> cols = addForeignKeyEvent.getForeignKeyCols();
if (cols.size() > 0) {
+ AddForeignKeyMessage msg = MessageBuilder.getInstance()
+ .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(), msgFactory
- .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols()).toString());
+ new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(cols.get(0).getPktable_db());
event.setTableName(cols.get(0).getPktable_name());
@@ -591,9 +647,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAddUniqueConstraint(AddUniqueConstraintEvent addUniqueConstraintEvent) throws MetaException {
List<SQLUniqueConstraint> cols = addUniqueConstraintEvent.getUniqueConstraintCols();
if (cols.size() > 0) {
+ AddUniqueConstraintMessage msg = MessageBuilder.getInstance()
+ .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(), msgFactory
- .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols()).toString());
+ new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(),
+ msgEncoder.getSerializer().serialize(msg)
+ );
event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(cols.get(0).getTable_db());
event.setTableName(cols.get(0).getTable_name());
@@ -609,9 +668,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void onAddNotNullConstraint(AddNotNullConstraintEvent addNotNullConstraintEvent) throws MetaException {
List<SQLNotNullConstraint> cols = addNotNullConstraintEvent.getNotNullConstraintCols();
if (cols.size() > 0) {
+ AddNotNullConstraintMessage msg = MessageBuilder.getInstance()
+ .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols());
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(), msgFactory
- .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols()).toString());
+ new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(),
+ msgEncoder.getSerializer().serialize(msg)
+ );
event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME);
event.setDbName(cols.get(0).getTable_db());
event.setTableName(cols.get(0).getTable_name());
@@ -628,9 +690,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
String dbName = dropConstraintEvent.getDbName();
String tableName = dropConstraintEvent.getTableName();
String constraintName = dropConstraintEvent.getConstraintName();
+ DropConstraintMessage msg = MessageBuilder.getInstance()
+ .buildDropConstraintMessage(dbName, tableName, constraintName);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(), msgFactory
- .buildDropConstraintMessage(dbName, tableName, constraintName).toString());
+ new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(),
+ msgEncoder.getSerializer().serialize(msg));
event.setCatName(dropConstraintEvent.getCatName());
event.setDbName(dbName);
event.setTableName(tableName);
@@ -646,9 +710,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
throws MetaException {
String tableName = allocWriteIdEvent.getTableName();
String dbName = allocWriteIdEvent.getDbName();
+ AllocWriteIdMessage msg = MessageBuilder.getInstance()
+ .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName);
NotificationEvent event =
- new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), msgFactory
- .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName).toString());
+ new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(),
+ msgEncoder.getSerializer().serialize(msg)
+ );
event.setTableName(tableName);
event.setDbName(dbName);
try {
@@ -661,11 +728,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator)
throws MetaException {
- AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent,
+ AcidWriteMessage msg = MessageBuilder.getInstance().buildAcidWriteMessage(acidWriteEvent,
new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(),
acidWriteEvent.getSubDirs()));
- NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString());
- event.setMessageFormat(msgFactory.getMessageFormat());
+ NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(),
+ msgEncoder.getSerializer().serialize(msg));
+ event.setMessageFormat(msgEncoder.getMessageFormat());
event.setDbName(acidWriteEvent.getDatabase());
event.setTableName(acidWriteEvent.getTable());
try {
@@ -848,7 +916,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
ResultSet rs = null;
try {
stmt = dbConn.createStatement();
- event.setMessageFormat(msgFactory.getMessageFormat());
+ event.setMessageFormat(msgEncoder.getMessageFormat());
if (sqlGenerator.getDbProduct() == MYSQL) {
stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
@@ -910,7 +978,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
* DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
*/
private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
- event.setMessageFormat(msgFactory.getMessageFormat());
+ event.setMessageFormat(msgEncoder.getMessageFormat());
LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
event.getMessage());
HMSHandler.getMSForConf(conf).addNotificationEvent(event);
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
index 4707d0e..17d3b73 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
@@ -20,6 +20,7 @@
package org.apache.hive.hcatalog.messaging.json;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hive.hcatalog.messaging.CreateFunctionMessage;
import org.apache.thrift.TException;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -46,7 +47,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
this.db = fn.getDbName();
this.timestamp = timestamp;
try {
- this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+ this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
} catch (TException ex) {
throw new IllegalArgumentException("Could not serialize Function object", ex);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
index 010c4a6..7fb7d1c 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
@@ -20,6 +20,7 @@
package org.apache.hive.hcatalog.messaging.json;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hive.hcatalog.messaging.DropFunctionMessage;
import org.apache.thrift.TException;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -46,7 +47,7 @@ public class JSONDropFunctionMessage extends DropFunctionMessage {
this.db = fn.getDbName();
this.timestamp = timestamp;
try {
- this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+ this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
} catch (TException ex) {
throw new IllegalArgumentException("Could not serialize Function object", ex);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
index ec573a3..770dd1e 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
@@ -20,16 +20,14 @@
package org.apache.hive.hcatalog.messaging.json;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
import org.apache.hive.hcatalog.messaging.AlterPartitionMessage;
import org.apache.hive.hcatalog.messaging.AlterTableMessage;
@@ -43,15 +41,9 @@ import org.apache.hive.hcatalog.messaging.DropTableMessage;
import org.apache.hive.hcatalog.messaging.InsertMessage;
import org.apache.hive.hcatalog.messaging.MessageDeserializer;
import org.apache.hive.hcatalog.messaging.MessageFactory;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
/**
* The JSON implementation of the MessageFactory. Constructs JSON implementations of
* each message-type.
@@ -111,7 +103,7 @@ public class JSONMessageFactory extends MessageFactory {
public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitionsIterator) {
return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
table.getTableName(), table.getTableType(),
- getPartitionKeyValues(table, partitionsIterator), now());
+ MessageBuilder.getPartitionKeyValues(table, partitionsIterator), now());
}
@Override
@@ -119,14 +111,14 @@ public class JSONMessageFactory extends MessageFactory {
Long writeId) {
return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL,
before.getDbName(), before.getTableName(), table.getTableType(),
- getPartitionKeyValues(table,before), writeId, now());
+ MessageBuilder.getPartitionKeyValues(table,before), writeId, now());
}
@Override
public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions) {
return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
table.getTableName(), table.getTableType(),
- getPartitionKeyValues(table, partitions), now());
+ MessageBuilder.getPartitionKeyValues(table, partitions), now());
}
@Override
@@ -159,27 +151,4 @@ public class JSONMessageFactory extends MessageFactory {
return System.currentTimeMillis() / 1000;
}
- private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
- Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
- for (int i=0; i<table.getPartitionKeysSize(); ++i) {
- partitionKeys.put(table.getPartitionKeys().get(i).getName(),
- partition.getValues().get(i));
- }
- return partitionKeys;
- }
-
- private static List<Map<String, String>> getPartitionKeyValues(final Table table, Iterator<Partition> iterator) {
- return Lists.newArrayList(Iterators.transform(iterator, new com.google.common.base.Function<Partition, Map<String, String>>() {
- @Override
- public Map<String, String> apply(@Nullable Partition partition) {
- return getPartitionKeyValues(table, partition);
- }
- }));
- }
-
- static String createFunctionObjJson(Function functionObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(functionObj, "UTF-8");
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index a00ea17..dc555a4 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -119,7 +120,16 @@ public class TestDbNotificationListener {
private static Map<String, String> emptyParameters = new HashMap<String, String>();
private static IMetaStoreClient msClient;
private static IDriver driver;
- private static MessageDeserializer md = null;
+ private static MessageDeserializer md;
+
+ static {
+ try {
+ md = MessageFactory.getInstance(JSONMessageEncoder.FORMAT).getDeserializer();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private int startTime;
private long firstEventId;
private final String testTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString();
@@ -267,7 +277,7 @@ public class TestDbNotificationListener {
SessionState.start(new CliSessionState(conf));
msClient = new HiveMetaStoreClient(conf);
driver = DriverFactory.newDriver(conf);
- md = MessageFactory.getInstance().getDeserializer();
+ md = JSONMessageEncoder.getInstance().getDeserializer();
bcompat = new ReplicationV1CompatRule(msClient, conf, testsToSkipForReplV1BackwardCompatTesting );
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
new file mode 100644
index 0000000..c16799d
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.Collections;
+
+public class TestReplAcidTablesWithJsonMessage extends TestReplicationScenariosAcidTables {
+
+ @Rule
+ public TestRule replV1BackwardCompat;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ internalBeforeClassSetup(Collections.emptyMap(), TestReplAcidTablesWithJsonMessage.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList());
+ super.setup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
new file mode 100644
index 0000000..0ec0275
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class TestReplAcrossInstancesWithJsonMessageFormat
+ extends TestReplicationScenariosAcrossInstances {
+
+ @Rule
+ public TestRule replV1BackwardCompat;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ internalBeforeClassSetup(Collections.emptyMap(), TestReplicationScenarios.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
+ super.setup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
new file mode 100644
index 0000000..792ec1c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.Collections;
+
+public class TestReplIncrementalLoadAcidTablesWithJsonMessage
+ extends TestReplicationScenariosIncrementalLoadAcidTables {
+
+ @Rule
+ public TestRule replV1BackwardCompat;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ internalBeforeClassSetup(Collections.emptyMap(),
+ TestReplIncrementalLoadAcidTablesWithJsonMessage.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList());
+ super.setup();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
new file mode 100644
index 0000000..faf1ced
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestRule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
+ @Rule
+ public TestRule replV1BackwardCompatibleRule =
+ new ReplicationV1CompatRule(metaStoreClient, hconf,
+ new ArrayList<>(Collections.singletonList("testEventFilters")));
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ internalBeforeClassSetup(Collections.emptyMap());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 9c35aa6..75cd68a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hive.ql.parse;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.ObjectStore;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -47,13 +46,18 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.MoveTask;
@@ -62,45 +66,42 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import javax.annotation.Nullable;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
public class TestReplicationScenarios {
@@ -115,18 +116,14 @@ public class TestReplicationScenarios {
private final static String TEST_PATH =
System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
- private static HiveConf hconf;
+ static HiveConf hconf;
+ static HiveMetaStoreClient metaStoreClient;
private static IDriver driver;
- private static HiveMetaStoreClient metaStoreClient;
private static String proxySettingName;
- static HiveConf hconfMirror;
- static IDriver driverMirror;
- static HiveMetaStoreClient metaStoreClientMirror;
+ private static HiveConf hconfMirror;
+ private static IDriver driverMirror;
+ private static HiveMetaStoreClient metaStoreClientMirror;
- @Rule
- public TestRule replV1BackwardCompatibleRule =
- new ReplicationV1CompatRule(metaStoreClient, hconf,
- new ArrayList<>(Arrays.asList("testEventFilters")));
// Make sure we skip backward-compat checking for those tests that don't generate events
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
@@ -141,23 +138,30 @@ public class TestReplicationScenarios {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ HashMap<String, String> overrideProperties = new HashMap<>();
+ overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ internalBeforeClassSetup(overrideProperties);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> additionalProperties)
+ throws Exception {
hconf = new HiveConf(TestReplicationScenarios.class);
- String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname);
+ String metastoreUri = System.getProperty("test."+MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
if (metastoreUri != null) {
- hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
+ hconf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
return;
}
- hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
+ hconf.set(MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getHiveName(),
DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts";
hconf.set(proxySettingName, "*");
- MetaStoreTestUtils.startMetaStoreWithRetry(hconf);
hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
- hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hconf.set(MetastoreConf.ConfVars.THRIFT_CONNECTION_RETRIES.getHiveName(), "3");
hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hconf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
@@ -166,11 +170,17 @@ public class TestReplicationScenarios {
hconf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
hconf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname,
- "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore");
+ "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore");
hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+ additionalProperties.forEach((key, value) -> {
+ hconf.set(key, value);
+ });
+
+ MetaStoreTestUtils.startMetaStoreWithRetry(hconf);
+
Path testPath = new Path(TEST_PATH);
FileSystem fs = FileSystem.get(testPath.toUri(),hconf);
fs.mkdirs(testPath);
@@ -3077,12 +3087,12 @@ public class TestReplicationScenarios {
// that match a provided message format
IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat =
- new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat());
+ new MessageFormatFilter(JSONMessageEncoder.FORMAT);
IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat =
- new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat() + "_bogus");
+ new MessageFormatFilter(JSONMessageEncoder.FORMAT + "_bogus");
NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0);
- assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat());
+ assertEquals(JSONMessageEncoder.FORMAT,dummyEvent.getMessageFormat());
assertFalse(restrictByDefaultMessageFormat.accept(null));
assertTrue(restrictByDefaultMessageFormat.accept(dummyEvent));
@@ -3431,19 +3441,25 @@ public class TestReplicationScenarios {
}
private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) {
- MessageFactory msgFactory = MessageFactory.getInstance();
+ MessageEncoder msgEncoder = null;
+ try {
+ msgEncoder = MessageFactory.getInstance(JSONMessageEncoder.FORMAT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Table t = new Table();
t.setDbName(dbname);
t.setTableName(tblname);
NotificationEvent event = new NotificationEvent(
evid,
(int)System.currentTimeMillis(),
- MessageFactory.CREATE_TABLE_EVENT,
- msgFactory.buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()).toString()
+ MessageBuilder.CREATE_TABLE_EVENT,
+ MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator())
+ .toString()
);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- event.setMessageFormat(msgFactory.getMessageFormat());
+ event.setMessageFormat(msgEncoder.getMessageFormat());
return event;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index e043e54..4ceb9fa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -17,16 +17,16 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -54,13 +53,11 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Collections;
-import com.google.common.collect.Lists;
-import org.junit.Ignore;
+import java.util.Map;
import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -72,12 +69,10 @@ public class TestReplicationScenariosAcidTables {
@Rule
public final TestName testName = new TestName();
- @Rule
- public TestRule replV1BackwardCompat;
-
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- private static WarehouseInstance primary, replica, replicaNonAcid;
- private static HiveConf conf;
+ static WarehouseInstance primary;
+ private static WarehouseInstance replica, replicaNonAcid;
+ static HiveConf conf;
private String primaryDbName, replicatedDbName, primaryDbNameExtra;
private enum OperationType {
REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS,
@@ -87,25 +82,38 @@ public class TestReplicationScenariosAcidTables {
@BeforeClass
public static void classLevelSetup() throws Exception {
- conf = new HiveConf(TestReplicationScenariosAcidTables.class);
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> overrides,
+ Class clazz) throws Exception {
+
+ conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
- put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.exec.dynamic.partition.mode", "nonstrict");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ HashMap<String, String> acidEnableConf = new HashMap<String, String>() {{
+ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+ put("hive.support.concurrency", "true");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
}};
- primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
- replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+
+ acidEnableConf.putAll(overrides);
+
+ primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "false");
@@ -123,7 +131,6 @@ public class TestReplicationScenariosAcidTables {
@Before
public void setup() throws Throwable {
- replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 61473a8..7e8caf0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
@@ -43,7 +44,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
@@ -53,7 +53,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -72,33 +71,40 @@ import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.junit.Assert;
public class TestReplicationScenariosAcrossInstances {
@Rule
public final TestName testName = new TestName();
- @Rule
- public TestRule replV1BackwardCompat;
-
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- private static WarehouseInstance primary, replica;
+ static WarehouseInstance primary;
+ private static WarehouseInstance replica;
private String primaryDbName, replicatedDbName;
private static HiveConf conf;
@BeforeClass
public static void classLevelSetup() throws Exception {
- conf = new HiveConf(TestReplicationScenarios.class);
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+ throws Exception {
+ conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
+ Map<String, String> localOverrides = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
}};
- primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
- replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+ localOverrides.putAll(overrides);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
}
@AfterClass
@@ -109,7 +115,6 @@ public class TestReplicationScenariosAcrossInstances {
@Before
public void setup() throws Throwable {
- replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
@@ -323,7 +328,8 @@ public class TestReplicationScenariosAcrossInstances {
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i int, j int)")
.run("insert into table1 values (1,2)")
- .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump(primaryDbName, null,
+ Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, tuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -419,7 +425,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("create table table2 (a int, city string) partitioned by (country string)")
.run("create table table3 (i int, j int)")
.run("insert into table1 values (1,2)")
- .dump(dbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump(dbName, null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, tuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -433,7 +439,8 @@ public class TestReplicationScenariosAcrossInstances {
.run("alter table table1 rename to renamed_table1")
.run("insert into table2 partition(country='india') values (1,'mumbai') ")
.run("create table table4 (i int, j int)")
- .dump(dbName, tuple.lastReplicationId, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump(dbName, tuple.lastReplicationId,
+ Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, tuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -467,7 +474,7 @@ public class TestReplicationScenariosAcrossInstances {
SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("use " + dbTwo)
.run("create table t1 (i int, j int)")
- .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
/*
Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
@@ -526,7 +533,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("use " + dbOne)
.run("create table t1 (i int, j int) partitioned by (load_date date) "
+ "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
- .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
String dbTwo = primaryDbName + randomTwo;
WarehouseInstance.Tuple incrementalTuple = primary
@@ -905,15 +912,20 @@ public class TestReplicationScenariosAcrossInstances {
// Incremental load to non existing db should return database not exist error.
tuple = primary.dump("someJunkDB", tuple.lastReplicationId);
- CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation);
- response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " +
- "database does not exist");
+ CommandProcessorResponse response =
+ replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation + "'");
+ assertTrue(response.getErrorMessage().toLowerCase()
+ .contains("org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: someJunkDB"
+ .toLowerCase()));
// Bootstrap load from an empty dump directory should return empty load directory error.
tuple = primary.dump("someJunkDB", null);
- response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation);
- response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" +
- " no data to load in path");
+ response = replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation+"'");
+ assertTrue(response.getErrorMessage().toLowerCase()
+ .contains(
+ "semanticException no data to load in path"
+ .toLowerCase())
+ );
primary.run(" drop database if exists " + testDbName + " cascade");
}
@@ -935,7 +947,8 @@ public class TestReplicationScenariosAcrossInstances {
.run("insert into table3 partition(country='india') values(3)")
.dump(primaryDbName, bootstrapTuple.lastReplicationId);
- replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'"))
+ replica.load(replicatedDbName, incremental.dumpLocation,
+ Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'"))
.status(replicatedDbName)
.verifyResult(incremental.lastReplicationId)
.run("use " + replicatedDbName)
@@ -959,7 +972,8 @@ public class TestReplicationScenariosAcrossInstances {
FileStatus[] fileStatus = fs.listStatus(path);
int numEvents = fileStatus.length - 1; //one is metadata file
- replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"))
+ replica.load(replicatedDbName, incremental.dumpLocation,
+ Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" })
@@ -1112,7 +1126,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("show tables")
.verifyResults(new String[] { "t1", "t2" })
.run("select id from t1")
- .verifyResults(Arrays.asList("10"))
+ .verifyResults(Collections.singletonList("10"))
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india", "uk", "us"));
verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation);
@@ -1154,9 +1168,8 @@ public class TestReplicationScenariosAcrossInstances {
// also not loaded.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
- @Nullable
@Override
- public Boolean apply(@Nullable CallerArguments args) {
+ public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
@@ -1197,9 +1210,8 @@ public class TestReplicationScenariosAcrossInstances {
// Verify if create table is not called on table t1 but called for t2 and t3.
// Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails.
callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
- @Nullable
@Override
- public Boolean apply(@Nullable CallerArguments args) {
+ public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName));
@@ -1235,9 +1247,8 @@ public class TestReplicationScenariosAcrossInstances {
// Verify if no create table/function calls. Only add foreign key constraints on table t2.
callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
- @Nullable
@Override
- public Boolean apply(@Nullable CallerArguments args) {
+ public Boolean apply(CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
@@ -1307,7 +1318,7 @@ public class TestReplicationScenariosAcrossInstances {
};
InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub);
- List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
+ List<String> withConfigs = Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'");
replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour
getPartitionStub.assertInjectionsPerformed(true, false);
@@ -1318,7 +1329,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("show tables")
.verifyResults(new String[] {"t2" })
.run("select country from t2 order by country")
- .verifyResults(Arrays.asList("india"))
+ .verifyResults(Collections.singletonList("india"))
.run("show functions like '" + replicatedDbName + "*'")
.verifyResult(replicatedDbName + ".testFunctionOne");
@@ -1378,7 +1389,8 @@ public class TestReplicationScenariosAcrossInstances {
@Test
public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable {
- List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+ List<String> withConfigs =
+ Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
@@ -1399,7 +1411,8 @@ public class TestReplicationScenariosAcrossInstances {
@Test
public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable {
- List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+ List<String> withConfigs =
+ Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
@@ -1417,16 +1430,16 @@ public class TestReplicationScenariosAcrossInstances {
testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple);
}
- private void testMoveOptimization(String primarydb, String replicadb, String replicatedDbName_CM,
+ private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
- List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'");
+ List<String> withConfigs =
+ Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
// fail add notification for given event type.
BehaviourInjection<NotificationEvent, Boolean> callerVerifier
= new BehaviourInjection<NotificationEvent, Boolean>() {
- @Nullable
@Override
- public Boolean apply(@Nullable NotificationEvent entry) {
+ public Boolean apply(NotificationEvent entry) {
if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) {
injectionPathCalled = true;
LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName())
@@ -1440,19 +1453,19 @@ public class TestReplicationScenariosAcrossInstances {
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicadb, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicaDb, tuple.dumpLocation, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
- replica.load(replicadb, tuple.dumpLocation, withConfigs);
+ replica.load(replicaDb, tuple.dumpLocation, withConfigs);
- replica.run("use " + replicadb)
+ replica.run("use " + replicaDb)
.run("select country from " + tbl + " where country == 'india'")
.verifyResults(Arrays.asList("india"));
- primary.run("use " + primarydb)
+ primary.run("use " + primaryDb)
.run("drop table " + tbl);
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);