You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/10/22 08:22:00 UTC
[2/4] hive git commit: HIVE-20679: DDL operations on hive might
create large messages for DBNotification (Anishek Agarwal,
reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
new file mode 100644
index 0000000..787b9b2
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.regex.PatternSyntaxException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONInsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONOpenTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys;
+
+public class MessageBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageBuilder.class);
+
+ public static final String ADD_PARTITION_EVENT = "ADD_PARTITION";
+ public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION";
+ public static final String DROP_PARTITION_EVENT = "DROP_PARTITION";
+ public static final String CREATE_TABLE_EVENT = "CREATE_TABLE";
+ public static final String ALTER_TABLE_EVENT = "ALTER_TABLE";
+ public static final String DROP_TABLE_EVENT = "DROP_TABLE";
+ public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE";
+ public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE";
+ public static final String DROP_DATABASE_EVENT = "DROP_DATABASE";
+ public static final String INSERT_EVENT = "INSERT";
+ public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION";
+ public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION";
+ public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY";
+ public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY";
+ public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT";
+ public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT";
+ public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT";
+ public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA";
+ public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA";
+ public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA";
+ public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION";
+ public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION";
+ public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION";
+ public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG";
+ public static final String DROP_CATALOG_EVENT = "DROP_CATALOG";
+ public static final String OPEN_TXN_EVENT = "OPEN_TXN";
+ public static final String COMMIT_TXN_EVENT = "COMMIT_TXN";
+ public static final String ABORT_TXN_EVENT = "ABORT_TXN";
+ public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
+ public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
+ public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
+
+ protected static final Configuration conf = MetastoreConf.newMetastoreConf();
+
+ private static final String MS_SERVER_URL = MetastoreConf
+ .getVar(conf, MetastoreConf.ConfVars.THRIFT_URIS, "");
+ private static final String MS_SERVICE_PRINCIPAL =
+ MetastoreConf.getVar(conf, MetastoreConf.ConfVars.KERBEROS_PRINCIPAL, "");
+
+ private static volatile MessageBuilder instance;
+ private static final Object lock = new Object();
+
+ public static MessageBuilder getInstance() {
+ if (instance == null) {
+ synchronized (lock) {
+ if (instance == null) {
+ instance = new MessageBuilder();
+ instance.init();
+ }
+ }
+ }
+ return instance;
+ }
+
+ private static List<Predicate<String>> paramsFilter;
+
+ public void init() {
+ List<String> excludePatterns = Arrays.asList(MetastoreConf
+ .getTrimmedStringsVar(conf,
+ MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS));
+ try {
+ paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns);
+ } catch (PatternSyntaxException e) {
+ LOG.error("Regex pattern compilation failed. Verify that "
+ + "metastore.notification.parameters.exclude.patterns has valid patterns.");
+ throw new IllegalStateException("Regex pattern compilation failed. " + e.getMessage());
+ }
+ }
+
+ public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
+ return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now());
+ }
+
+ public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) {
+ return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+ beforeDb, afterDb, now());
+ }
+
+ public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
+ return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now());
+ }
+
+ public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) {
+ return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now());
+ }
+
+ public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp,
+ Long writeId) {
+ return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after,
+ isTruncateOp, writeId, now());
+ }
+
+ public DropTableMessage buildDropTableMessage(Table table) {
+ return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now());
+ }
+
+ public AddPartitionMessage buildAddPartitionMessage(Table table,
+ Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) {
+ return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
+ partitionsIterator, partitionFileIter, now());
+ }
+
+ public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
+ Partition after, boolean isTruncateOp, Long writeId) {
+ return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+ table, before, after, isTruncateOp, writeId, now());
+ }
+
+ public DropPartitionMessage buildDropPartitionMessage(Table table,
+ Iterator<Partition> partitionsIterator) {
+ return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
+ getPartitionKeyValues(table, partitionsIterator), now());
+ }
+
+ public CreateFunctionMessage buildCreateFunctionMessage(Function fn) {
+ return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
+ }
+
+ public DropFunctionMessage buildDropFunctionMessage(Function fn) {
+ return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
+ }
+
+ public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
+ boolean replace, Iterator<String> fileIter) {
+ return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+ tableObj, partObj, replace, fileIter, now());
+ }
+
+ public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) {
+ return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now());
+ }
+
+ public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) {
+ return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now());
+ }
+
+ public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) {
+ return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now());
+ }
+
+ public AddNotNullConstraintMessage buildAddNotNullConstraintMessage(
+ List<SQLNotNullConstraint> nns) {
+ return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now());
+ }
+
+ public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
+ String constraintName) {
+ return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName,
+ constraintName, now());
+ }
+
+ public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) {
+ return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(),
+ now());
+ }
+
+ public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) {
+ return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
+ beforeCat, afterCat, now());
+ }
+
+ public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) {
+ return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(),
+ now());
+ }
+
+ public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
+ return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
+ }
+
+ public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
+ return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
+ }
+
+ public AbortTxnMessage buildAbortTxnMessage(Long txnId) {
+ return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
+ }
+
+ public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
+ String dbName, String tableName) {
+ return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList,
+ dbName, tableName, now());
+ }
+
+ public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent,
+ Iterator<String> files) {
+ return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent,
+ files);
+ }
+
+ private long now() {
+ return System.currentTimeMillis() / 1000;
+ }
+
+ public static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(primaryKeyObj, "UTF-8");
+ }
+
+ public static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(foreignKeyObj, "UTF-8");
+ }
+
+ public static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj)
+ throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(uniqueConstraintObj, "UTF-8");
+ }
+
+ public static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj)
+ throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(notNullConstaintObj, "UTF-8");
+ }
+
+ public static String createDatabaseObjJson(Database dbObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(dbObj, "UTF-8");
+ }
+
+ public static String createCatalogObjJson(Catalog catObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(catObj, "UTF-8");
+ }
+
+ public static String createTableObjJson(Table tableObj) throws TException {
+ //Note: The parameters of the Table object will be removed in the filter if it matches
+ // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
+ filterMapkeys(tableObj.getParameters(), paramsFilter);
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(tableObj, "UTF-8");
+ }
+
+ public static String createPartitionObjJson(Partition partitionObj) throws TException {
+ //Note: The parameters of the Partition object will be removed in the filter if it matches
+ // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
+ filterMapkeys(partitionObj.getParameters(), paramsFilter);
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(partitionObj, "UTF-8");
+ }
+
+ public static String createFunctionObjJson(Function functionObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(functionObj, "UTF-8");
+ }
+
+ public static Table getTableObj(ObjectNode jsonTree) throws Exception {
+ TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
+ Table tableObj = new Table();
+ String tableJson = jsonTree.get("tableObjJson").asText();
+ deSerializer.deserialize(tableObj, tableJson, "UTF-8");
+ return tableObj;
+ }
+
+ /*
+ * TODO: Some thoughts here : We have a current todo to move some of these methods over to
+ * MessageFactory instead of being here, so we can override them, but before we move them over,
+ * we should keep the following in mind:
+ *
+ * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when
+ * implementing it rather than forcing ourselves down a path wherein returning List is part of
+ * our interface, and then people use .size() or somesuch which makes us need to materialize
+ * the entire list and not change. Also, returning Iterables allows us to do things like
+ * Iterables.transform for some of these.
+ * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a
+ * couple of things - firstly, that of serialization format, although that is fine for this
+ * JSONMessageEncoder, and secondly, that makes us just have a number of mappings, one for each
+ * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific
+ * item belongs in that event message / event itself, as opposed to in the factory. It's okay to
+ * have utility accessor methods here that are used by each of the messages to provide accessors.
+ * I'm adding a couple of those here.
+ *
+ */
+
+ public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass)
+ throws Exception {
+ TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory());
+ TBase obj = objClass.newInstance();
+ thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8");
+ return obj;
+ }
+
+ public static Iterable<? extends TBase> getTObjs(
+ Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception {
+
+ try {
+ return Iterables.transform(objRefStrs, new com.google.common.base.Function<String, TBase>() {
+
+ public TBase apply(@Nullable String objStr) {
+ try {
+ return getTObj(objStr, objClass);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException re) {
+ // We have to add this bit of exception handling here, because Function.apply does not allow us to throw
+ // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException
+ // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns
+ // a throwable instead of an Exception, we have to account for the possibility that the underlying code
+ // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the
+ // RuntimeException is the best thing we can do.
+ Throwable t = re.getCause();
+ if (t instanceof Exception) {
+ throw (Exception) t;
+ } else {
+ throw re;
+ }
+ }
+ }
+
+ // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well
+ public static Iterable<? extends TBase> getTObjs(
+ ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass)
+ throws Exception {
+ Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName);
+ return getTObjs(Iterables.transform(jsonArrayIterator, JsonNode::asText), objClass);
+ }
+
+ public static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
+ Map<String, String> partitionKeys = new LinkedHashMap<>();
+ for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
+ partitionKeys.put(table.getPartitionKeys().get(i).getName(),
+ partition.getValues().get(i));
+ }
+ return partitionKeys;
+ }
+
+ public static List<Map<String, String>> getPartitionKeyValues(final Table table,
+ Iterator<Partition> iterator) {
+ return Lists.newArrayList(Iterators
+ .transform(iterator, partition -> getPartitionKeyValues(table, partition)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
new file mode 100644
index 0000000..832a80c
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageEncoder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public interface MessageEncoder {
+ MessageDeserializer getDeserializer();
+
+ MessageSerializer getSerializer();
+
+ String getMessageFormat();
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index 58c6891..16e74bb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -20,329 +20,86 @@
package org.apache.hadoop.hive.metastore.messaging;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Catalog;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.List;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
/**
* Abstract Factory for the construction of HCatalog message instances.
*/
public abstract class MessageFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageFactory.class.getName());
- // Common name constants for event messages
- public static final String ADD_PARTITION_EVENT = "ADD_PARTITION";
- public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION";
- public static final String DROP_PARTITION_EVENT = "DROP_PARTITION";
- public static final String CREATE_TABLE_EVENT = "CREATE_TABLE";
- public static final String ALTER_TABLE_EVENT = "ALTER_TABLE";
- public static final String DROP_TABLE_EVENT = "DROP_TABLE";
- public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE";
- public static final String ALTER_DATABASE_EVENT = "ALTER_DATABASE";
- public static final String DROP_DATABASE_EVENT = "DROP_DATABASE";
- public static final String INSERT_EVENT = "INSERT";
- public static final String CREATE_FUNCTION_EVENT = "CREATE_FUNCTION";
- public static final String DROP_FUNCTION_EVENT = "DROP_FUNCTION";
- public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY";
- public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY";
- public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT";
- public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT";
- public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT";
- public static final String CREATE_ISCHEMA_EVENT = "CREATE_ISCHEMA";
- public static final String ALTER_ISCHEMA_EVENT = "ALTER_ISCHEMA";
- public static final String DROP_ISCHEMA_EVENT = "DROP_ISCHEMA";
- public static final String ADD_SCHEMA_VERSION_EVENT = "ADD_SCHEMA_VERSION";
- public static final String ALTER_SCHEMA_VERSION_EVENT = "ALTER_SCHEMA_VERSION";
- public static final String DROP_SCHEMA_VERSION_EVENT = "DROP_SCHEMA_VERSION";
- public static final String CREATE_CATALOG_EVENT = "CREATE_CATALOG";
- public static final String DROP_CATALOG_EVENT = "DROP_CATALOG";
- public static final String OPEN_TXN_EVENT = "OPEN_TXN";
- public static final String COMMIT_TXN_EVENT = "COMMIT_TXN";
- public static final String ABORT_TXN_EVENT = "ABORT_TXN";
- public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
- public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
- public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
+ protected static final Configuration conf = MetastoreConf.newMetastoreConf();
- private static MessageFactory instance = null;
+ private static final Map<String, Method> registry = new HashMap<>();
- protected static final Configuration conf = MetastoreConf.newMetastoreConf();
- /*
- // TODO MS-SPLIT I'm 99% certain we don't need this, as MetastoreConf.newMetastoreConf already
- adds this resource.
- static {
- conf.addResource("hive-site.xml");
+ public static void register(String messageFormat, Class clazz) {
+ Method method = requiredMethod(clazz);
+ registry.put(messageFormat, method);
}
- */
-
- protected static final String MS_SERVER_URL = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS, "");
- protected static final String MS_SERVICE_PRINCIPAL =
- MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL, "");
+ static {
+ register(GzipJSONMessageEncoder.FORMAT, GzipJSONMessageEncoder.class);
+ register(JSONMessageEncoder.FORMAT, JSONMessageEncoder.class);
+ }
- /**
- * Getter for MessageFactory instance.
- */
- public static MessageFactory getInstance() {
- if (instance == null) {
- instance = getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY));
+ private static Method requiredMethod(Class clazz) {
+ if (MessageEncoder.class.isAssignableFrom(clazz)) {
+ try {
+ Method methodInstance = clazz.getMethod("getInstance");
+ if (MessageEncoder.class.isAssignableFrom(methodInstance.getReturnType())) {
+ int modifiers = methodInstance.getModifiers();
+ if (Modifier.isStatic(modifiers) && Modifier.isPublic(modifiers)) {
+ return methodInstance;
+ }
+ throw new NoSuchMethodException(
+ "modifier for getInstance() method is not 'public static' in " + clazz
+ .getCanonicalName());
+ }
+ throw new NoSuchMethodException(
+ "return type is not assignable to " + MessageEncoder.class.getCanonicalName());
+ } catch (NoSuchMethodException e) {
+ String message = clazz.getCanonicalName()
+ + " does not implement the required 'public static MessageEncoder getInstance()' method ";
+ LOG.error(message, e);
+ throw new IllegalArgumentException(message, e);
+ }
}
- return instance;
+ String message = clazz.getCanonicalName() + " is not assignable to " + MessageEncoder.class
+ .getCanonicalName();
+ LOG.error(message);
+ throw new IllegalArgumentException(message);
}
- private static MessageFactory getInstance(String className) {
- try {
- MessageFactory factory = JavaUtils.newInstance(JavaUtils.getClass(className, MessageFactory.class));
- factory.init();
- return factory;
- } catch (MetaException e) {
- throw new IllegalStateException("Could not construct MessageFactory implementation: ", e);
+ public static MessageEncoder getInstance(String messageFormat)
+ throws InvocationTargetException, IllegalAccessException {
+ Method methodInstance = registry.get(messageFormat);
+ if (methodInstance == null) {
+ LOG.error("received incorrect MessageFormat " + messageFormat);
+ throw new RuntimeException("messageFormat: " + messageFormat + " is not supported ");
}
+ return (MessageEncoder) methodInstance.invoke(null);
}
- /**
- * Getter for MessageDeserializer, corresponding to the specified format and version.
- * @param format Serialization format for notifications.
- * @param version Version of serialization format (currently ignored.)
- * @return MessageDeserializer.
- */
- public static MessageDeserializer getDeserializer(String format,
- String version) {
- return getInstance(MetastoreConf.getVar(conf, ConfVars.EVENT_MESSAGE_FACTORY)).getDeserializer();
- // Note : The reason this method exists outside the no-arg getDeserializer method is in
- // case there is a user-implemented MessageFactory that's used, and some the messages
- // are in an older format and the rest in another. Then, what MessageFactory is default
- // is irrelevant, we should always use the one that was used to create it to deserialize.
- //
- // There exist only 2 implementations of this - json and jms
- //
- // Additional note : rather than as a config parameter, does it make sense to have
- // this use jdbc-like semantics that each MessageFactory made available register
- // itself for discoverability? Might be worth pursuing.
+ public static MessageEncoder getDefaultInstance(Configuration conf) {
+ String clazz =
+ MetastoreConf.get(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getVarname());
+ try {
+ Class<?> clazzObject = MessageFactory.class.getClassLoader().loadClass(clazz);
+ return (MessageEncoder) requiredMethod(clazzObject).invoke(null);
+ } catch (Exception e) {
+ String message = "could not load the configured class " + clazz;
+ LOG.error(message, e);
+ throw new IllegalStateException(message, e);
+ }
}
-
- public void init() throws MetaException {}
-
- public abstract MessageDeserializer getDeserializer();
-
- /**
- * Getter for message-format.
- */
- public abstract String getMessageFormat();
-
- /**
- * Factory method for CreateDatabaseMessage.
- * @param db The Database being added.
- * @return CreateDatabaseMessage instance.
- */
- public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db);
-
- /**
- * Factory method for AlterDatabaseMessage.
- * @param beforeDb The Database before alter.
- * @param afterDb The Database after alter.
- * @return AlterDatabaseMessage instance.
- */
- public abstract AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb);
-
- /**
- * Factory method for DropDatabaseMessage.
- * @param db The Database being dropped.
- * @return DropDatabaseMessage instance.
- */
- public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db);
-
- /**
- * Factory method for CreateTableMessage.
- * @param table The Table being created.
- * @param files Iterator of files
- * @return CreateTableMessage instance.
- */
- public abstract CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> files);
-
- /**
- * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null,
- * which means no message should be sent. This is because there are many flavors of alter
- * table (add column, add partition, etc.). Some are covered elsewhere (like add partition)
- * and some are not yet supported.
- * @param before The table before the alter
- * @param after The table after the alter
- * @param isTruncateOp Flag to denote truncate table
- * @param writeId writeId under which alter is done (for ACID tables)
- * @return
- */
- public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp,
- Long writeId);
-
- /**
- * Factory method for DropTableMessage.
- * @param table The Table being dropped.
- * @return DropTableMessage instance.
- */
- public abstract DropTableMessage buildDropTableMessage(Table table);
-
- /**
- * Factory method for AddPartitionMessage.
- * @param table The Table to which the partitions are added.
- * @param partitions The iterator to set of Partitions being added.
- * @param partitionFiles The iterator of partition files
- * @return AddPartitionMessage instance.
- */
- public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitions,
- Iterator<PartitionFiles> partitionFiles);
-
- /**
- * Factory method for building AlterPartitionMessage
- * @param table The table in which the partition is being altered
- * @param before The partition before it was altered
- * @param after The partition after it was altered
- * @param isTruncateOp Flag to denote truncate partition
- * @param writeId writeId under which alter is done (for ACID tables)
- * @return a new AlterPartitionMessage
- */
- public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
- Partition after, boolean isTruncateOp,
- Long writeId);
-
- /**
- * Factory method for DropPartitionMessage.
- * @param table The Table from which the partition is dropped.
- * @param partitions The set of partitions being dropped.
- * @return DropPartitionMessage instance.
- */
- public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions);
-
- /**
- * Factory method for CreateFunctionMessage.
- * @param fn The Function being added.
- * @return CreateFunctionMessage instance.
- */
- public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn);
-
- /**
- * Factory method for DropFunctionMessage.
- * @param fn The Function being dropped.
- * @return DropFunctionMessage instance.
- */
- public abstract DropFunctionMessage buildDropFunctionMessage(Function fn);
-
- /**
- * Factory method for building insert message
- *
- * @param tableObj Table object where the insert occurred in
- * @param ptnObj Partition object where the insert occurred in, may be null if
- * the insert was done into a non-partitioned table
- * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
- * @param files Iterator of file created
- * @return instance of InsertMessage
- */
- public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj,
- boolean replace, Iterator<String> files);
-
- /**
- * Factory method for building open txn message using start and end transaction range
- *
- * @param fromTxnId start transaction id (inclusive)
- * @param toTxnId end transaction id (inclusive)
- * @return instance of OpenTxnMessage
- */
- public abstract OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId);
-
- /**
- * Factory method for building commit txn message
- *
- * @param txnId Id of the transaction to be committed
- * @return instance of CommitTxnMessage
- */
- public abstract CommitTxnMessage buildCommitTxnMessage(Long txnId);
-
- /**
- * Factory method for building abort txn message
- *
- * @param txnId Id of the transaction to be aborted
- * @return instance of AbortTxnMessage
- */
- public abstract AbortTxnMessage buildAbortTxnMessage(Long txnId);
-
- /**
- * Factory method for building alloc write id message
- *
- * @param txnToWriteIdList List of Txn Ids and write id map
- * @param dbName db for which write ids to be allocated
- * @param tableName table for which write ids to be allocated
- * @return instance of AllocWriteIdMessage
- */
- public abstract AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, String dbName,
- String tableName);
-
- /***
- * Factory method for building add primary key message
- *
- * @param pks list of primary keys
- * @return instance of AddPrimaryKeyMessage
- */
- public abstract AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks);
-
- /***
- * Factory method for building add foreign key message
- *
- * @param fks list of foreign keys
- * @return instance of AddForeignKeyMessage
- */
- public abstract AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks);
-
- /***
- * Factory method for building add unique constraint message
- *
- * @param uks list of unique constraints
- * @return instance of SQLUniqueConstraint
- */
- public abstract AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks);
-
- /***
- * Factory method for building add not null constraint message
- *
- * @param nns list of not null constraints
- * @return instance of SQLNotNullConstraint
- */
- public abstract AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns);
-
- /***
- * Factory method for building drop constraint message
- * @param dbName
- * @param tableName
- * @param constraintName
- * @return
- */
- public abstract DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
- String constraintName);
-
- public abstract CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog);
-
- public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog);
-
- public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat);
-
- /**
- * Factory method for building acid write message
- *
- *
- * @param acidWriteEvent information related to the acid write operation
- * @param files files added by this write operation
- * @return instance of AcidWriteMessage
- */
- public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
new file mode 100644
index 0000000..b249d76
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageSerializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public interface MessageSerializer {
+ default String serialize(EventMessage message) {
+ return message.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
index fdb6942..712c12c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.metastore.messaging.event.filters;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import java.util.regex.Pattern;
@@ -41,9 +41,9 @@ public class DatabaseAndTableFilter extends BasicFilter {
}
private boolean isTxnRelatedEvent(final NotificationEvent event) {
- return ((event.getEventType().equals(MessageFactory.OPEN_TXN_EVENT)) ||
- (event.getEventType().equals(MessageFactory.COMMIT_TXN_EVENT)) ||
- (event.getEventType().equals(MessageFactory.ABORT_TXN_EVENT)));
+ return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) ||
+ (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) ||
+ (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT)));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
index cc528ee..a5d8f78 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import java.util.Iterator;
import java.util.List;
@@ -60,9 +61,9 @@ public class JSONAcidWriteMessage extends AcidWriteMessage {
this.writeId = acidWriteEvent.getWriteId();
this.partition = acidWriteEvent.getPartition();
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj());
+ this.tableObjJson = MessageBuilder.createTableObjJson(acidWriteEvent.getTableObj());
if (acidWriteEvent.getPartitionObj() != null) {
- this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj());
+ this.partitionObjJson = MessageBuilder.createPartitionObjJson(acidWriteEvent.getPartitionObj());
} else {
this.partitionObjJson = null;
}
@@ -119,13 +120,13 @@ public class JSONAcidWriteMessage extends AcidWriteMessage {
@Override
public Table getTableObj() throws Exception {
- return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+ return (tableObjJson == null) ? null : (Table) MessageBuilder.getTObj(tableObjJson, Table.class);
}
@Override
public Partition getPartitionObj() throws Exception {
return ((partitionObjJson == null) ? null :
- (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class));
+ (Partition) MessageBuilder.getTObj(partitionObjJson, Partition.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
index d4a0bc2..c3d6fb6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,7 +56,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage {
this.foreignKeyListJson = new ArrayList<>();
try {
for (SQLForeignKey pk : fks) {
- foreignKeyListJson.add(JSONMessageFactory.createForeignKeyObjJson(pk));
+ foreignKeyListJson.add(MessageBuilder.createForeignKeyObjJson(pk));
}
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
@@ -86,7 +87,7 @@ public class JSONAddForeignKeyMessage extends AddForeignKeyMessage {
public List<SQLForeignKey> getForeignKeys() throws Exception {
List<SQLForeignKey> fks = new ArrayList<>();
for (String pkJson : foreignKeyListJson) {
- fks.add((SQLForeignKey)JSONMessageFactory.getTObj(pkJson, SQLForeignKey.class));
+ fks.add((SQLForeignKey) MessageBuilder.getTObj(pkJson, SQLForeignKey.class));
}
return fks;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
index 1c3e8a8..f9f351f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddNotNullConstraintMessage.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -50,7 +51,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage
this.notNullConstraintListJson = new ArrayList<>();
try {
for (SQLNotNullConstraint nn : nns) {
- notNullConstraintListJson.add(JSONMessageFactory.createNotNullConstraintObjJson(nn));
+ notNullConstraintListJson.add(MessageBuilder.createNotNullConstraintObjJson(nn));
}
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
@@ -81,7 +82,7 @@ public class JSONAddNotNullConstraintMessage extends AddNotNullConstraintMessage
public List<SQLNotNullConstraint> getNotNullConstraints() throws Exception {
List<SQLNotNullConstraint> nns = new ArrayList<>();
for (String nnJson : notNullConstraintListJson) {
- nns.add((SQLNotNullConstraint)JSONMessageFactory.getTObj(nnJson, SQLNotNullConstraint.class));
+ nns.add((SQLNotNullConstraint) MessageBuilder.getTObj(nnJson, SQLNotNullConstraint.class));
}
return nns;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
index bb2093b..6494cb8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
import org.apache.thrift.TException;
@@ -79,11 +80,11 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
partitionListJson = new ArrayList<>();
Partition partitionObj;
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
while (partitionsIterator.hasNext()) {
partitionObj = partitionsIterator.next();
- partitions.add(JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObj));
- partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObj));
+ partitions.add(MessageBuilder.getPartitionKeyValues(tableObj, partitionObj));
+ partitionListJson.add(MessageBuilder.createPartitionObjJson(partitionObj));
}
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
@@ -124,7 +125,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
}
@Override
@@ -141,7 +142,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage {
public Iterable<Partition> getPartitionObjs() throws Exception {
// glorified cast from Iterable<TBase> to Iterable<Partition>
return Iterables.transform(
- JSONMessageFactory.getTObjs(partitionListJson,Partition.class),
+ MessageBuilder.getTObjs(partitionListJson,Partition.class),
new Function<Object, Partition>() {
@Nullable
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
index 3a18be8..606a051 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,7 +56,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage {
this.primaryKeyListJson = new ArrayList<>();
try {
for (SQLPrimaryKey pk : pks) {
- primaryKeyListJson.add(JSONMessageFactory.createPrimaryKeyObjJson(pk));
+ primaryKeyListJson.add(MessageBuilder.createPrimaryKeyObjJson(pk));
}
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
@@ -86,7 +87,7 @@ public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage {
public List<SQLPrimaryKey> getPrimaryKeys() throws Exception {
List<SQLPrimaryKey> pks = new ArrayList<>();
for (String pkJson : primaryKeyListJson) {
- pks.add((SQLPrimaryKey)JSONMessageFactory.getTObj(pkJson, SQLPrimaryKey.class));
+ pks.add((SQLPrimaryKey) MessageBuilder.getTObj(pkJson, SQLPrimaryKey.class));
}
return pks;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
index 3c4d5e0..ebdcd94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddUniqueConstraintMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -52,7 +53,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage {
this.uniqueConstraintListJson = new ArrayList<>();
try {
for (SQLUniqueConstraint uk : uks) {
- uniqueConstraintListJson.add(JSONMessageFactory.createUniqueConstraintObjJson(uk));
+ uniqueConstraintListJson.add(MessageBuilder.createUniqueConstraintObjJson(uk));
}
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
@@ -83,7 +84,7 @@ public class JSONAddUniqueConstraintMessage extends AddUniqueConstraintMessage {
public List<SQLUniqueConstraint> getUniqueConstraints() throws Exception {
List<SQLUniqueConstraint> uks = new ArrayList<>();
for (String pkJson : uniqueConstraintListJson) {
- uks.add((SQLUniqueConstraint)JSONMessageFactory.getTObj(pkJson, SQLUniqueConstraint.class));
+ uks.add((SQLUniqueConstraint) MessageBuilder.getTObj(pkJson, SQLUniqueConstraint.class));
}
return uks;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
index 779c0b0..7b7c12e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterCatalogMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
public class JSONAlterCatalogMessage extends AlterCatalogMessage {
@@ -41,8 +42,8 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage {
this.servicePrincipal = servicePrincipal;
this.timestamp = timestamp;
try {
- this.catObjBeforeJson = JSONMessageFactory.createCatalogObjJson(catObjBefore);
- this.catObjAfterJson = JSONMessageFactory.createCatalogObjJson(catObjAfter);
+ this.catObjBeforeJson = MessageBuilder.createCatalogObjJson(catObjBefore);
+ this.catObjAfterJson = MessageBuilder.createCatalogObjJson(catObjAfter);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -71,12 +72,12 @@ public class JSONAlterCatalogMessage extends AlterCatalogMessage {
@Override
public Catalog getCatObjBefore() throws Exception {
- return (Catalog) JSONMessageFactory.getTObj(catObjBeforeJson, Catalog.class);
+ return (Catalog) MessageBuilder.getTObj(catObjBeforeJson, Catalog.class);
}
@Override
public Catalog getCatObjAfter() throws Exception {
- return (Catalog) JSONMessageFactory.getTObj(catObjAfterJson, Catalog.class);
+ return (Catalog) MessageBuilder.getTObj(catObjAfterJson, Catalog.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
index 7b316d5..5f9dae4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterDatabaseMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,8 +49,8 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage {
this.db = dbObjBefore.getName();
this.timestamp = timestamp;
try {
- this.dbObjBeforeJson = JSONMessageFactory.createDatabaseObjJson(dbObjBefore);
- this.dbObjAfterJson = JSONMessageFactory.createDatabaseObjJson(dbObjAfter);
+ this.dbObjBeforeJson = MessageBuilder.createDatabaseObjJson(dbObjBefore);
+ this.dbObjAfterJson = MessageBuilder.createDatabaseObjJson(dbObjAfter);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -78,12 +79,12 @@ public class JSONAlterDatabaseMessage extends AlterDatabaseMessage {
@Override
public Database getDbObjBefore() throws Exception {
- return (Database) JSONMessageFactory.getTObj(dbObjBeforeJson, Database.class);
+ return (Database) MessageBuilder.getTObj(dbObjBeforeJson, Database.class);
}
@Override
public Database getDbObjAfter() throws Exception {
- return (Database) JSONMessageFactory.getTObj(dbObjAfterJson, Database.class);
+ return (Database) MessageBuilder.getTObj(dbObjAfterJson, Database.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
index 9b85f4c..a38c1aa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -62,12 +63,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
this.tableType = tableObj.getTableType();
this.isTruncateOp = Boolean.toString(isTruncateOp);
this.timestamp = timestamp;
- this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore);
+ this.keyValues = MessageBuilder.getPartitionKeyValues(tableObj, partitionObjBefore);
this.writeId = writeId;
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
- this.partitionObjBeforeJson = JSONMessageFactory.createPartitionObjJson(partitionObjBefore);
- this.partitionObjAfterJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
+ this.partitionObjBeforeJson = MessageBuilder.createPartitionObjJson(partitionObjBefore);
+ this.partitionObjAfterJson = MessageBuilder.createPartitionObjJson(partitionObjAfter);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -118,17 +119,17 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
}
@Override
public Partition getPtnObjBefore() throws Exception {
- return (Partition) JSONMessageFactory.getTObj(partitionObjBeforeJson, Partition.class);
+ return (Partition) MessageBuilder.getTObj(partitionObjBeforeJson, Partition.class);
}
@Override
public Partition getPtnObjAfter() throws Exception {
- return (Partition) JSONMessageFactory.getTObj(partitionObjAfterJson, Partition.class);
+ return (Partition) MessageBuilder.getTObj(partitionObjAfterJson, Partition.class);
}
public String getTableObjJson() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
index eddff98..d6ec826 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -55,8 +56,8 @@ public class JSONAlterTableMessage extends AlterTableMessage {
this.timestamp = timestamp;
this.writeId = writeId;
try {
- this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore);
- this.tableObjAfterJson = JSONMessageFactory.createTableObjJson(tableObjAfter);
+ this.tableObjBeforeJson = MessageBuilder.createTableObjJson(tableObjBefore);
+ this.tableObjAfterJson = MessageBuilder.createTableObjJson(tableObjAfter);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -102,12 +103,12 @@ public class JSONAlterTableMessage extends AlterTableMessage {
@Override
public Table getTableObjBefore() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjBeforeJson,Table.class);
}
@Override
public Table getTableObjAfter() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjAfterJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjAfterJson,Table.class);
}
public String getTableObjBeforeJson() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
index 2c4940b..482fc8e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import java.util.List;
@@ -118,13 +119,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
@Override
public Table getTableObj(int idx) throws Exception {
- return tableObjs == null ? null : (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class);
+ return tableObjs == null ? null : (Table) MessageBuilder.getTObj(tableObjs.get(idx), Table.class);
}
@Override
public Partition getPartitionObj(int idx) throws Exception {
return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null :
- (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class)));
+ (Partition) MessageBuilder.getTObj(partitionObjs.get(idx), Partition.class)));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
index 761ff99..1f5c9e8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,7 +49,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
this.db = db.getName();
this.timestamp = timestamp;
try {
- this.dbJson = JSONMessageFactory.createDatabaseObjJson(db);
+ this.dbJson = MessageBuilder.createDatabaseObjJson(db);
} catch (TException ex) {
throw new IllegalArgumentException("Could not serialize Function object", ex);
}
@@ -57,7 +58,7 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
@Override
public Database getDatabaseObject() throws Exception {
- return (Database) JSONMessageFactory.getTObj(dbJson, Database.class);
+ return (Database) MessageBuilder.getTObj(dbJson, Database.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
index f7287df..bb50052 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,7 +48,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
this.db = fn.getDbName();
this.timestamp = timestamp;
try {
- this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+ this.functionObjJson = MessageBuilder.createFunctionObjJson(fn);
} catch (TException ex) {
throw new IllegalArgumentException("Could not serialize Function object", ex);
}
@@ -72,7 +73,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage {
@Override
public Function getFunctionObj() throws Exception {
- return (Function) JSONMessageFactory.getTObj(functionObjJson,Function.class);
+ return (Function) MessageBuilder.getTObj(functionObjJson,Function.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
index b80003b..145ee4b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.thrift.TException;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -68,7 +69,7 @@ public class JSONCreateTableMessage extends CreateTableMessage {
this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
tableObj.getTableType(), timestamp);
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -111,7 +112,7 @@ public class JSONCreateTableMessage extends CreateTableMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
}
public String getTableObjJson() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
index 957d595..23e5496 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.thrift.TException;
@@ -70,7 +71,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage {
this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
tableObj.getTableType(), partitionKeyValues, timestamp);
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -117,7 +118,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson, Table.class);
}
public String getTableObjJson() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
index 88374ec..1ef2ad0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropTableMessage.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hive.metastore.messaging.json;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.thrift.TException;
@@ -63,7 +64,7 @@ public class JSONDropTableMessage extends DropTableMessage {
this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(),
tableObj.getTableType(), timestamp);
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
} catch (TException e) {
throw new IllegalArgumentException("Could not serialize: ", e);
}
@@ -86,7 +87,7 @@ public class JSONDropTableMessage extends DropTableMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index 2318a67..40d480b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.thrift.TException;
@@ -67,9 +68,9 @@ public class JSONInsertMessage extends InsertMessage {
this.tableType = tableObj.getTableType();
try {
- this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj);
+ this.tableObjJson = MessageBuilder.createTableObjJson(tableObj);
if (null != ptnObj) {
- this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj);
+ this.ptnObjJson = MessageBuilder.createPartitionObjJson(ptnObj);
} else {
this.ptnObjJson = null;
}
@@ -128,12 +129,12 @@ public class JSONInsertMessage extends InsertMessage {
@Override
public Table getTableObj() throws Exception {
- return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class);
+ return (Table) MessageBuilder.getTObj(tableObjJson,Table.class);
}
@Override
public Partition getPtnObj() throws Exception {
- return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class));
+ return ((null == ptnObjJson) ? null : (Partition) MessageBuilder.getTObj(ptnObjJson, Partition.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
new file mode 100644
index 0000000..08c53e4
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+
+/**
+ * The JSON implementation of the MessageFactory. Constructs JSON implementations of each
+ * message-type.
+ */
+public class JSONMessageEncoder implements MessageEncoder {
+ public static final String FORMAT = "json-0.2";
+
+ private static MessageDeserializer deserializer = new JSONMessageDeserializer();
+ private static MessageSerializer serializer = new MessageSerializer() {
+ };
+
+ private static volatile MessageEncoder instance;
+
+ public static MessageEncoder getInstance() {
+ if (instance == null) {
+ synchronized (GzipJSONMessageEncoder.class) {
+ if (instance == null) {
+ instance = new JSONMessageEncoder();
+ }
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public MessageDeserializer getDeserializer() {
+ return deserializer;
+ }
+
+ @Override
+ public MessageSerializer getSerializer() {
+ return serializer;
+ }
+
+ /**
+ * This is a format that's shipped, for any changes make sure that backward compatibiltiy
+ * with existing messages in this format are taken care of.
+ *
+ */
+ @Override
+ public String getMessageFormat() {
+ return FORMAT;
+ }
+}