You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:04 UTC
[54/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive
might create large messages for DBNotification (Anishek Agarwal,
reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 3fe8b58..314ca48 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -17,32 +17,19 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+
import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -50,13 +37,11 @@ import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import javax.annotation.Nullable;
-import java.util.Collections;
+import java.util.Map;
+
import com.google.common.collect.Lists;
-import org.junit.Ignore;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -65,11 +50,9 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
@Rule
public final TestName testName = new TestName();
- @Rule
- public TestRule replV1BackwardCompat;
-
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class);
- private static WarehouseInstance primary, replica, replicaNonAcid;
+ static WarehouseInstance primary;
+ private static WarehouseInstance replica, replicaNonAcid;
private static HiveConf conf;
private String primaryDbName, replicatedDbName, primaryDbNameExtra;
private enum OperationType {
@@ -80,12 +63,21 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
@BeforeClass
public static void classLevelSetup() throws Exception {
- conf = new HiveConf(TestReplicationScenariosAcidTables.class);
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+ throws Exception {
+ conf = new HiveConf(clazz);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
+ HashMap<String, String> acidConfs = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "true");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
@@ -97,9 +89,11 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
put("mapred.input.dir.recursive", "true");
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
}};
- primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
- replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
- HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
+
+ acidConfs.putAll(overrides);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+ Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "false");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
@@ -116,7 +110,6 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
@Before
public void setup() throws Throwable {
- replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 1e3478d..aae7bd7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -100,7 +100,7 @@ public class WarehouseInstance implements Closeable {
initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
}
- public WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster,
Map<String, String> overridesForHiveConf) throws Exception {
this(logger, cluster, overridesForHiveConf, null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 66f3b78..a51b7e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity.Type;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -997,12 +997,12 @@ public final class QueryResultsCache {
String tableName;
switch (event.getEventType()) {
- case MessageFactory.ADD_PARTITION_EVENT:
- case MessageFactory.ALTER_PARTITION_EVENT:
- case MessageFactory.DROP_PARTITION_EVENT:
- case MessageFactory.ALTER_TABLE_EVENT:
- case MessageFactory.DROP_TABLE_EVENT:
- case MessageFactory.INSERT_EVENT:
+ case MessageBuilder.ADD_PARTITION_EVENT:
+ case MessageBuilder.ALTER_PARTITION_EVENT:
+ case MessageBuilder.DROP_PARTITION_EVENT:
+ case MessageBuilder.ALTER_TABLE_EVENT:
+ case MessageBuilder.DROP_TABLE_EVENT:
+ case MessageBuilder.INSERT_EVENT:
dbName = event.getDbName();
tableName = event.getTableName();
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 4872080..c75bde5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -31,11 +31,9 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -156,8 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
- new EventBoundaryFilter(work.eventFrom, work.eventTo),
- new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
+ new EventBoundaryFilter(work.eventFrom, work.eventTo));
EventUtils.MSClientNotificationFetcher evFetcher
= new EventUtils.MSClientNotificationFetcher(hiveDb);
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index d09b98c..8747727 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
@@ -63,7 +63,7 @@ public class LoadConstraint {
private final ConstraintEvent event;
private final String dbNameToLoadIn;
private final TaskTracker tracker;
- private final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+ private final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer();
public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn,
TaskTracker existingTracker) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
index b9a5d21..5db3f26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
@@ -18,20 +18,26 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class AbortTxnHandler extends AbstractEventHandler {
+class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
AbortTxnHandler(NotificationEvent event) {
super(event);
}
@Override
+ AbortTxnMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAbortTxnMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
index 3ed005c..672f402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
@@ -18,9 +18,10 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-abstract class AbstractConstraintEventHandler extends AbstractEventHandler {
+abstract class AbstractConstraintEventHandler<T extends EventMessage> extends AbstractEventHandler<T> {
AbstractConstraintEventHandler(NotificationEvent event) {
super(event);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index a70c673..b996703 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -18,20 +18,48 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class AbstractEventHandler implements EventHandler {
+abstract class AbstractEventHandler<T extends EventMessage> implements EventHandler {
static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class);
+ static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance();
final NotificationEvent event;
final MessageDeserializer deserializer;
+ final String eventMessageAsJSON;
+ final T eventMessage;
AbstractEventHandler(NotificationEvent event) {
this.event = event;
- deserializer = MessageFactory.getInstance().getDeserializer();
+ try {
+ deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+ } catch (Exception e) {
+ String message =
+ "could not create appropriate messageFactory for format " + event.getMessageFormat();
+ LOG.error(message, e);
+ throw new IllegalStateException(message, e);
+ }
+ eventMessage = eventMessage(event.getMessage());
+ eventMessageAsJSON = eventMessageAsJSON(eventMessage);
+ }
+
+ /**
+ * This takes in the string representation of the message in the format as specified in rdbms backing metastore.
+ */
+ abstract T eventMessage(String stringRepresentation);
+
+ private String eventMessageAsJSON(T eventMessage) {
+ if (eventMessage == null) {
+ // this will only happen in case DefaultHandler is invoked
+ return null;
+ }
+ return jsonMessageEncoder.getSerializer().serialize(eventMessage);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
index 8fdf2f1..736a162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java
@@ -18,21 +18,27 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-public class AddForeignKeyHandler extends AbstractConstraintEventHandler {
+public class AddForeignKeyHandler extends AbstractConstraintEventHandler<AddForeignKeyMessage> {
AddForeignKeyHandler(NotificationEvent event) {
super(event);
}
@Override
+ AddForeignKeyMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAddForeignKeyMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
LOG.debug("Processing#{} ADD_FOREIGNKEY_MESSAGE message : {}", fromEventId(),
- event.getMessage());
+ eventMessageAsJSON);
if (shouldReplicate(withinContext)) {
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
index 335d4e6..c778198 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java
@@ -18,22 +18,28 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler {
+public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler<AddNotNullConstraintMessage> {
AddNotNullConstraintHandler(NotificationEvent event) {
super(event);
}
@Override
+ AddNotNullConstraintMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAddNotNullConstraintMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
LOG.debug("Processing#{} ADD_NOTNULLCONSTRAINT_MESSAGE message : {}", fromEventId(),
- event.getMessage());
+ eventMessageAsJSON);
if (shouldReplicate(withinContext)) {
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 973a65b..5c16887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -42,10 +43,15 @@ class AddPartitionHandler extends AbstractEventHandler {
}
@Override
+ EventMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAddPartitionMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
- AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
+ AddPartitionMessage apm = (AddPartitionMessage) eventMessage;
org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
if (tobj == null) {
LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
index cf45c68..f9c08c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java
@@ -18,22 +18,28 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler {
+public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler<AddPrimaryKeyMessage> {
AddPrimaryKeyHandler(NotificationEvent event) {
super(event);
}
@Override
+ AddPrimaryKeyMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAddPrimaryKeyMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
LOG.debug("Processing#{} ADD_PRIMARYKEY_MESSAGE message : {}", fromEventId(),
- event.getMessage());
+ eventMessageAsJSON);
if (shouldReplicate(withinContext)) {
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
index 58835a0..69caf08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java
@@ -18,22 +18,29 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-public class AddUniqueConstraintHandler extends AbstractConstraintEventHandler {
+public class AddUniqueConstraintHandler
+ extends AbstractConstraintEventHandler<AddUniqueConstraintMessage> {
AddUniqueConstraintHandler(NotificationEvent event) {
super(event);
}
@Override
+ AddUniqueConstraintMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAddUniqueConstraintMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
LOG.debug("Processing#{} ADD_UNIQUECONSTRAINT_MESSAGE message : {}", fromEventId(),
- event.getMessage());
+ eventMessageAsJSON);
if (shouldReplicate(withinContext)) {
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index 38efbd7..7602d1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -18,19 +18,25 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class AllocWriteIdHandler extends AbstractEventHandler {
+class AllocWriteIdHandler extends AbstractEventHandler<AllocWriteIdMessage> {
AllocWriteIdHandler(NotificationEvent event) {
super(event);
}
@Override
+ AllocWriteIdMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAllocWriteIdMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
index 3863c59..a31d1b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java
@@ -18,20 +18,26 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class AlterDatabaseHandler extends AbstractEventHandler {
+class AlterDatabaseHandler extends AbstractEventHandler<AlterDatabaseMessage> {
AlterDatabaseHandler(NotificationEvent event) {
super(event);
}
@Override
+ AlterDatabaseMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAlterDatabaseMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
index cde4eed..d81408e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -23,17 +23,15 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-class AlterPartitionHandler extends AbstractEventHandler {
+class AlterPartitionHandler extends AbstractEventHandler<AlterPartitionMessage> {
private final org.apache.hadoop.hive.metastore.api.Partition after;
private final org.apache.hadoop.hive.metastore.api.Table tableObject;
private final boolean isTruncateOp;
@@ -41,7 +39,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
AlterPartitionHandler(NotificationEvent event) throws Exception {
super(event);
- AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
+ AlterPartitionMessage apm = eventMessage;
tableObject = apm.getTableObj();
org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
after = apm.getPtnObjAfter();
@@ -49,6 +47,11 @@ class AlterPartitionHandler extends AbstractEventHandler {
scenario = scenarioType(before, after);
}
+ @Override
+ AlterPartitionMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAlterPartitionMessage(stringRepresentation);
+ }
+
private enum Scenario {
ALTER {
@Override
@@ -86,7 +89,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
@Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
Table qlMdTable = new Table(tableObject);
if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
@@ -107,7 +110,7 @@ class AlterPartitionHandler extends AbstractEventHandler {
withinContext.hiveConf);
}
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index 5f582b3..00fa370 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class AlterTableHandler extends AbstractEventHandler {
+class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> {
private final org.apache.hadoop.hive.metastore.api.Table before;
private final org.apache.hadoop.hive.metastore.api.Table after;
private final boolean isTruncateOp;
@@ -59,13 +59,17 @@ class AlterTableHandler extends AbstractEventHandler {
AlterTableHandler(NotificationEvent event) throws Exception {
super(event);
- AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
- before = atm.getTableObjBefore();
- after = atm.getTableObjAfter();
- isTruncateOp = atm.getIsTruncateOp();
+ before = eventMessage.getTableObjBefore();
+ after = eventMessage.getTableObjAfter();
+ isTruncateOp = eventMessage.getIsTruncateOp();
scenario = scenarioType(before, after);
}
+ @Override
+ AlterTableMessage eventMessage(String stringRepresentation) {
+ return deserializer.getAlterTableMessage(stringRepresentation);
+ }
+
private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
org.apache.hadoop.hive.metastore.api.Table after) {
if (before.getDbName().equals(after.getDbName())
@@ -78,7 +82,7 @@ class AlterTableHandler extends AbstractEventHandler {
@Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), eventMessageAsJSON);
Table qlMdTableBefore = new Table(before);
if (!Utils
@@ -100,7 +104,7 @@ class AlterTableHandler extends AbstractEventHandler {
}
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 82a722f..620263f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -40,12 +40,17 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
-class CommitTxnHandler extends AbstractEventHandler {
+class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
CommitTxnHandler(NotificationEvent event) {
super(event);
}
+ @Override
+ CommitTxnMessage eventMessage(String stringRepresentation) {
+ return deserializer.getCommitTxnMessage(stringRepresentation);
+ }
+
private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
@@ -97,23 +102,22 @@ class CommitTxnHandler extends AbstractEventHandler {
@Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage());
- String payload = event.getMessage();
+ LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON);
+ String payload = eventMessageAsJSON;
if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
- CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage());
String contextDbName = withinContext.dbName == null ? null :
StringUtils.normalizeIdentifier(withinContext.dbName);
String contextTableName = withinContext.tableName == null ? null :
StringUtils.normalizeIdentifier(withinContext.tableName);
List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
- getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName);
+ getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
if (numEntry != 0) {
- commitTxnMessage.addWriteEventInfo(writeEventInfoList);
- payload = commitTxnMessage.toString();
- LOG.debug("payload for commit txn event : " + payload);
+ eventMessage.addWriteEventInfo(writeEventInfoList);
+ payload = jsonMessageEncoder.getSerializer().serialize(eventMessage);
+ LOG.debug("payload for commit txn event : " + eventMessageAsJSON);
}
org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
@@ -128,7 +132,7 @@ class CommitTxnHandler extends AbstractEventHandler {
// combination as primary key, so the entries with same table will come together. Only basic table metadata is
// used during import, so we need not dump the latest table metadata.
for (int idx = 0; idx < numEntry; idx++) {
- qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx));
+ qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(eventMessage.getTableObj(idx));
if (qlMdTablePrev == null) {
qlMdTablePrev = qlMdTable;
}
@@ -141,13 +145,13 @@ class CommitTxnHandler extends AbstractEventHandler {
qlMdTablePrev = qlMdTable;
}
- if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) {
+ if (qlMdTable.isPartitioned() && (null != eventMessage.getPartitionObj(idx))) {
qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable,
- commitTxnMessage.getPartitionObj(idx)));
+ eventMessage.getPartitionObj(idx)));
}
filesTobeAdded.add(Lists.newArrayList(
- ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx))));
+ ReplChangeManager.getListFromSeparatedString(eventMessage.getFiles(idx))));
}
//Dump last table in the list
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
index 21eb74b..7d64e49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java
@@ -24,19 +24,22 @@ import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-class CreateDatabaseHandler extends AbstractEventHandler {
+class CreateDatabaseHandler extends AbstractEventHandler<CreateDatabaseMessage> {
CreateDatabaseHandler(NotificationEvent event) {
super(event);
}
@Override
+ CreateDatabaseMessage eventMessage(String stringRepresentation) {
+ return deserializer.getCreateDatabaseMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), event.getMessage());
- CreateDatabaseMessage createDatabaseMsg =
- deserializer.getCreateDatabaseMessage(event.getMessage());
+ LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
FileSystem fileSystem = metaDataPath.getFileSystem(withinContext.hiveConf);
- EximUtil.createDbExportDump(fileSystem, metaDataPath, createDatabaseMsg.getDatabaseObject(),
+ EximUtil.createDbExportDump(fileSystem, metaDataPath, eventMessage.getDatabaseObject(),
withinContext.replicationSpec);
withinContext.createDmd(this).write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index 5f0338e..5954e15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -27,21 +27,24 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
-class CreateFunctionHandler extends AbstractEventHandler {
+class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> {
CreateFunctionHandler(NotificationEvent event) {
super(event);
}
@Override
+ CreateFunctionMessage eventMessage(String stringRepresentation) {
+ return deserializer.getCreateFunctionMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- CreateFunctionMessage createFunctionMessage =
- deserializer.getCreateFunctionMessage(event.getMessage());
- LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), eventMessageAsJSON);
Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
- new FunctionSerializer(createFunctionMessage.getFunctionObj(), withinContext.hiveConf)
+ new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf)
.writeTo(jsonWriter, withinContext.replicationSpec);
}
withinContext.createDmd(this).write();
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 897ea7f..550a82d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -25,23 +25,26 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
-class CreateTableHandler extends AbstractEventHandler {
+class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
CreateTableHandler(NotificationEvent event) {
super(event);
}
@Override
+ CreateTableMessage eventMessage(String stringRepresentation) {
+ return deserializer.getCreateTableMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
- LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
- org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
+ LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), eventMessageAsJSON);
+ org.apache.hadoop.hive.metastore.api.Table tobj = eventMessage.getTableObj();
if (tobj == null) {
LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
@@ -68,7 +71,7 @@ class CreateTableHandler extends AbstractEventHandler {
withinContext.hiveConf);
Path dataPath = new Path(withinContext.eventRoot, "data");
- Iterable<String> files = ctm.getFiles();
+ Iterable<String> files = eventMessage.getFiles();
if (files != null) {
// encoded filename/checksum of files, write into _files
try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
index 8977f62..864cb98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -30,7 +31,15 @@ class DefaultHandler extends AbstractEventHandler {
}
@Override
+ EventMessage eventMessage(String stringRepresentation) {
+ return null;
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
+ // we specifically use the the message string from the original event since we dont know what type of message
+ // to convert this message to, this handler should not be called since with different message formats we need
+ // the ability to convert messages to a given message type.
LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(event.getMessage());
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
index 979e9a1..4c239e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java
@@ -18,19 +18,26 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class DropConstraintHandler extends AbstractEventHandler {
+class DropConstraintHandler extends AbstractEventHandler<DropConstraintMessage> {
DropConstraintHandler(NotificationEvent event) {
super(event);
}
@Override
+ DropConstraintMessage eventMessage(String stringRepresentation) {
+ return deserializer.getDropConstraintMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(),
+ eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
index 4eae778..f09f77d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java
@@ -18,19 +18,25 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class DropDatabaseHandler extends AbstractEventHandler {
+class DropDatabaseHandler extends AbstractEventHandler<DropDatabaseMessage> {
DropDatabaseHandler(NotificationEvent event) {
super(event);
}
@Override
+ DropDatabaseMessage eventMessage(String stringRepresentation) {
+ return deserializer.getDropDatabaseMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
index 352b0cc..6140c0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
@@ -18,20 +18,26 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class DropFunctionHandler extends AbstractEventHandler {
+class DropFunctionHandler extends AbstractEventHandler<DropFunctionMessage> {
DropFunctionHandler(NotificationEvent event) {
super(event);
}
@Override
+ DropFunctionMessage eventMessage(String stringRepresentation) {
+ return deserializer.getDropFunctionMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
index 19b7044..e2a40d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
@@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class DropPartitionHandler extends AbstractEventHandler {
+class DropPartitionHandler extends AbstractEventHandler<DropPartitionMessage> {
DropPartitionHandler(NotificationEvent event) {
super(event);
}
@Override
+ DropPartitionMessage eventMessage(String stringRepresentation) {
+ return deserializer.getDropPartitionMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
index cce0192..7d17de2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class DropTableHandler extends AbstractEventHandler {
+class DropTableHandler extends AbstractEventHandler<DropTableMessage> {
DropTableHandler(NotificationEvent event) {
super(event);
}
@Override
+ DropTableMessage eventMessage(String stringRepresentation) {
+ return deserializer.getDropTableMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index a1d61f9..2a0379e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -33,27 +33,27 @@ public class EventHandlerFactory {
private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
static {
- register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
- register(MessageFactory.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class);
- register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
- register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
- register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
- register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
- register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
- register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
- register(MessageFactory.INSERT_EVENT, InsertHandler.class);
- register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
- register(MessageFactory.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class);
- register(MessageFactory.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class);
- register(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class);
- register(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class);
- register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class);
- register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class);
- register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class);
- register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class);
- register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class);
- register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class);
- register(MessageFactory.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
+ register(MessageBuilder.ADD_PARTITION_EVENT, AddPartitionHandler.class);
+ register(MessageBuilder.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class);
+ register(MessageBuilder.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
+ register(MessageBuilder.ALTER_TABLE_EVENT, AlterTableHandler.class);
+ register(MessageBuilder.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
+ register(MessageBuilder.CREATE_TABLE_EVENT, CreateTableHandler.class);
+ register(MessageBuilder.DROP_PARTITION_EVENT, DropPartitionHandler.class);
+ register(MessageBuilder.DROP_TABLE_EVENT, DropTableHandler.class);
+ register(MessageBuilder.INSERT_EVENT, InsertHandler.class);
+ register(MessageBuilder.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
+ register(MessageBuilder.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class);
+ register(MessageBuilder.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class);
+ register(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class);
+ register(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class);
+ register(MessageBuilder.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class);
+ register(MessageBuilder.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class);
+ register(MessageBuilder.DROP_DATABASE_EVENT, DropDatabaseHandler.class);
+ register(MessageBuilder.OPEN_TXN_EVENT, OpenTxnHandler.class);
+ register(MessageBuilder.COMMIT_TXN_EVENT, CommitTxnHandler.class);
+ register(MessageBuilder.ABORT_TXN_EVENT, AbortTxnHandler.class);
+ register(MessageBuilder.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
}
static void register(String event, Class<? extends EventHandler> handlerClazz) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index cf3822a..842e20a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -36,19 +36,23 @@ import java.util.Collections;
import java.util.List;
-class InsertHandler extends AbstractEventHandler {
+class InsertHandler extends AbstractEventHandler<InsertMessage> {
InsertHandler(NotificationEvent event) {
super(event);
}
@Override
+ InsertMessage eventMessage(String stringRepresentation) {
+ return deserializer.getInsertMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
return;
}
- InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
- org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg);
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(eventMessage);
if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
return;
@@ -58,18 +62,18 @@ class InsertHandler extends AbstractEventHandler {
assert(!AcidUtils.isTransactionalTable(qlMdTable));
List<Partition> qlPtns = null;
- if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
- qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));
+ if (qlMdTable.isPartitioned() && (null != eventMessage.getPtnObj())) {
+ qlPtns = Collections.singletonList(partitionObject(qlMdTable, eventMessage));
}
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
// Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
- withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
+ withinContext.replicationSpec.setIsReplace(eventMessage.isReplace());
EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
qlMdTable, qlPtns,
withinContext.replicationSpec,
withinContext.hiveConf);
- Iterable<String> files = insertMsg.getFiles();
+ Iterable<String> files = eventMessage.getFiles();
if (files != null) {
Path dataPath;
@@ -93,9 +97,9 @@ class InsertHandler extends AbstractEventHandler {
}
}
- LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} INSERT message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
index fe81fe1..215e726 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
@@ -18,20 +18,26 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-class OpenTxnHandler extends AbstractEventHandler {
+class OpenTxnHandler extends AbstractEventHandler<OpenTxnMessage> {
OpenTxnHandler(NotificationEvent event) {
super(event);
}
@Override
+ OpenTxnMessage eventMessage(String stringRepresentation) {
+ return deserializer.getOpenTxnMessage(stringRepresentation);
+ }
+
+ @Override
public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), event.getMessage());
+ LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), eventMessageAsJSON);
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(eventMessageAsJSON);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
index 2848212..ae3db9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -52,16 +52,16 @@ public class ConstraintsSerializer implements JsonWriter.Serializer {
String pksString, fksString, uksString, nnsString;
pksString = fksString = uksString = nnsString = "";
if (pks != null) {
- pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString();
+ pksString = MessageBuilder.getInstance().buildAddPrimaryKeyMessage(pks).toString();
}
if (fks != null) {
- fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString();
+ fksString = MessageBuilder.getInstance().buildAddForeignKeyMessage(fks).toString();
}
if (uks != null) {
- uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString();
+ uksString = MessageBuilder.getInstance().buildAddUniqueConstraintMessage(uks).toString();
}
if (nns != null) {
- nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString();
+ nnsString = MessageBuilder.getInstance().buildAddNotNullConstraintMessage(nns).toString();
}
writer.jsonGenerator.writeStringField("pks", pksString);
writer.jsonGenerator.writeStringField("uks", uksString);
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
index 5b26681..32ac6ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl.load.message;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
@@ -30,7 +30,7 @@ abstract class AbstractMessageHandler implements MessageHandler {
final HashSet<ReadEntity> readEntitySet = new HashSet<>();
final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker();
- final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+ final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer();
@Override
public Set<ReadEntity> readEntities() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
index 7057890..6a3c563 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.junit.Test;
@@ -53,9 +54,11 @@ public class TestEventHandlerFactory {
@Test
public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
+ NotificationEvent event = new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
+ "shouldGiveDefaultHandler", "s");
+ event.setMessageFormat(JSONMessageEncoder.FORMAT);
EventHandler eventHandler =
- EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
- "shouldGiveDefaultHandler", "s"));
+ EventHandlerFactory.handlerFor(event);
assertTrue(eventHandler instanceof DefaultHandler);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 1795ef7..1d64cce 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -514,7 +514,7 @@ public class MetastoreConf {
"Alternatively, configure hive.metastore.transactional.event.listeners to ensure both are invoked in same JDO transaction."),
EVENT_MESSAGE_FACTORY("metastore.event.message.factory",
"hive.metastore.event.message.factory",
- "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+ "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder",
"Factory class for making encoding and decoding messages in the events generated."),
EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS("metastore.notification.parameters.exclude.patterns",
"hive.metastore.notification.parameters.exclude.patterns", "",
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
index f24b419..1262c12 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -30,38 +30,38 @@ public abstract class EventMessage {
*/
public enum EventType {
- CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT),
- DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT),
- CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT),
- DROP_TABLE(MessageFactory.DROP_TABLE_EVENT),
- ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT),
- DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT),
- ALTER_DATABASE(MessageFactory.ALTER_DATABASE_EVENT),
- ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT),
- ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT),
- INSERT(MessageFactory.INSERT_EVENT),
- CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT),
- DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT),
-
- ADD_PRIMARYKEY(MessageFactory.ADD_PRIMARYKEY_EVENT),
- ADD_FOREIGNKEY(MessageFactory.ADD_FOREIGNKEY_EVENT),
- ADD_UNIQUECONSTRAINT(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT),
- ADD_NOTNULLCONSTRAINT(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT),
- DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT),
- CREATE_ISCHEMA(MessageFactory.CREATE_ISCHEMA_EVENT),
- ALTER_ISCHEMA(MessageFactory.ALTER_ISCHEMA_EVENT),
- DROP_ISCHEMA(MessageFactory.DROP_ISCHEMA_EVENT),
- ADD_SCHEMA_VERSION(MessageFactory.ADD_SCHEMA_VERSION_EVENT),
- ALTER_SCHEMA_VERSION(MessageFactory.ALTER_SCHEMA_VERSION_EVENT),
- DROP_SCHEMA_VERSION(MessageFactory.DROP_SCHEMA_VERSION_EVENT),
- CREATE_CATALOG(MessageFactory.CREATE_CATALOG_EVENT),
- DROP_CATALOG(MessageFactory.DROP_CATALOG_EVENT),
- OPEN_TXN(MessageFactory.OPEN_TXN_EVENT),
- COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT),
- ABORT_TXN(MessageFactory.ABORT_TXN_EVENT),
- ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT),
- ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT),
- ACID_WRITE(MessageFactory.ACID_WRITE_EVENT);
+ CREATE_DATABASE(MessageBuilder.CREATE_DATABASE_EVENT),
+ DROP_DATABASE(MessageBuilder.DROP_DATABASE_EVENT),
+ CREATE_TABLE(MessageBuilder.CREATE_TABLE_EVENT),
+ DROP_TABLE(MessageBuilder.DROP_TABLE_EVENT),
+ ADD_PARTITION(MessageBuilder.ADD_PARTITION_EVENT),
+ DROP_PARTITION(MessageBuilder.DROP_PARTITION_EVENT),
+ ALTER_DATABASE(MessageBuilder.ALTER_DATABASE_EVENT),
+ ALTER_TABLE(MessageBuilder.ALTER_TABLE_EVENT),
+ ALTER_PARTITION(MessageBuilder.ALTER_PARTITION_EVENT),
+ INSERT(MessageBuilder.INSERT_EVENT),
+ CREATE_FUNCTION(MessageBuilder.CREATE_FUNCTION_EVENT),
+ DROP_FUNCTION(MessageBuilder.DROP_FUNCTION_EVENT),
+
+ ADD_PRIMARYKEY(MessageBuilder.ADD_PRIMARYKEY_EVENT),
+ ADD_FOREIGNKEY(MessageBuilder.ADD_FOREIGNKEY_EVENT),
+ ADD_UNIQUECONSTRAINT(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT),
+ ADD_NOTNULLCONSTRAINT(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT),
+ DROP_CONSTRAINT(MessageBuilder.DROP_CONSTRAINT_EVENT),
+ CREATE_ISCHEMA(MessageBuilder.CREATE_ISCHEMA_EVENT),
+ ALTER_ISCHEMA(MessageBuilder.ALTER_ISCHEMA_EVENT),
+ DROP_ISCHEMA(MessageBuilder.DROP_ISCHEMA_EVENT),
+ ADD_SCHEMA_VERSION(MessageBuilder.ADD_SCHEMA_VERSION_EVENT),
+ ALTER_SCHEMA_VERSION(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT),
+ DROP_SCHEMA_VERSION(MessageBuilder.DROP_SCHEMA_VERSION_EVENT),
+ CREATE_CATALOG(MessageBuilder.CREATE_CATALOG_EVENT),
+ DROP_CATALOG(MessageBuilder.DROP_CATALOG_EVENT),
+ OPEN_TXN(MessageBuilder.OPEN_TXN_EVENT),
+ COMMIT_TXN(MessageBuilder.COMMIT_TXN_EVENT),
+ ABORT_TXN(MessageBuilder.ABORT_TXN_EVENT),
+ ALLOC_WRITE_ID(MessageBuilder.ALLOC_WRITE_ID_EVENT),
+ ALTER_CATALOG(MessageBuilder.ALTER_CATALOG_EVENT),
+ ACID_WRITE(MessageBuilder.ACID_WRITE_EVENT);
private String typeString;