You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:02 UTC
[52/75] [abbrv] hive git commit: HIVE-20679: DDL operations on hive
might create large messages for DBNotification (Anishek Agarwal,
reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
deleted file mode 100644
index 6aa079d..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore.messaging.json;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Predicate;
-import java.util.regex.PatternSyntaxException;
-
-import javax.annotation.Nullable;
-
-import org.apache.hadoop.hive.metastore.api.Catalog;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
-import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropCatalogMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys;
-
-/**
- * The JSON implementation of the MessageFactory. Constructs JSON implementations of each
- * message-type.
- */
-public class JSONMessageFactory extends MessageFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName());
-
- private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
-
- private static List<Predicate<String>> paramsFilter;
-
- @Override
- public void init() throws MetaException {
- super.init();
-
- List<String> excludePatterns = Arrays.asList(MetastoreConf
- .getTrimmedStringsVar(conf, MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS));
- try {
- paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns);
- } catch (PatternSyntaxException e) {
- LOG.error("Regex pattern compilation failed. Verify that "
- + "metastore.notification.parameters.exclude.patterns has valid patterns.");
- throw new MetaException("Regex pattern compilation failed. " + e.getMessage());
- }
- }
-
- @Override
- public MessageDeserializer getDeserializer() {
- return deserializer;
- }
-
- @Override
- public String getMessageFormat() {
- return "json-0.2";
- }
-
- @Override
- public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
- return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now());
- }
-
- @Override
- public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) {
- return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
- beforeDb, afterDb, now());
- }
-
- @Override
- public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
- return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now());
- }
-
- @Override
- public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) {
- return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now());
- }
-
- @Override
- public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, Long writeId) {
- return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, writeId, now());
- }
-
- @Override
- public DropTableMessage buildDropTableMessage(Table table) {
- return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now());
- }
-
- @Override
- public AddPartitionMessage buildAddPartitionMessage(Table table,
- Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) {
- return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
- partitionsIterator, partitionFileIter, now());
- }
-
- @Override
- public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
- Partition after, boolean isTruncateOp, Long writeId) {
- return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
- table, before, after, isTruncateOp, writeId, now());
- }
-
- @Override
- public DropPartitionMessage buildDropPartitionMessage(Table table,
- Iterator<Partition> partitionsIterator) {
- return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table,
- getPartitionKeyValues(table, partitionsIterator), now());
- }
-
- @Override
- public CreateFunctionMessage buildCreateFunctionMessage(Function fn) {
- return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
- }
-
- @Override
- public DropFunctionMessage buildDropFunctionMessage(Function fn) {
- return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now());
- }
-
- @Override
- public InsertMessage buildInsertMessage(Table tableObj, Partition partObj,
- boolean replace, Iterator<String> fileIter) {
- return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
- tableObj, partObj, replace, fileIter, now());
- }
-
- @Override
- public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) {
- return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now());
- }
-
- @Override
- public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) {
- return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now());
- }
-
- @Override
- public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) {
- return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now());
- }
-
- @Override
- public AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns) {
- return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now());
- }
-
- @Override
- public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName,
- String constraintName) {
- return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName,
- constraintName, now());
- }
-
- @Override
- public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) {
- return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now());
- }
-
- @Override
- public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) {
- return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL,
- beforeCat, afterCat, now());
- }
-
- @Override
- public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) {
- return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now());
- }
-
- @Override
- public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
- return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
- }
-
- @Override
- public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
- return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
- }
-
- @Override
- public AbortTxnMessage buildAbortTxnMessage(Long txnId) {
- return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
- }
-
- @Override
- public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
- String dbName, String tableName) {
- return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now());
- }
-
- @Override
- public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) {
- return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files);
- }
-
- private long now() {
- return System.currentTimeMillis() / 1000;
- }
-
- static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
- Map<String, String> partitionKeys = new LinkedHashMap<>();
- for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
- partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i));
- }
- return partitionKeys;
- }
-
- static List<Map<String, String>> getPartitionKeyValues(final Table table,
- Iterator<Partition> iterator) {
- return Lists.newArrayList(Iterators.transform(iterator,
- new com.google.common.base.Function<Partition, Map<String, String>>() {
- @Override
- public Map<String, String> apply(@Nullable Partition partition) {
- return getPartitionKeyValues(table, partition);
- }
- }));
- }
-
- static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(primaryKeyObj, "UTF-8");
- }
-
- static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(foreignKeyObj, "UTF-8");
- }
-
- static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(uniqueConstraintObj, "UTF-8");
- }
-
- static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(notNullConstaintObj, "UTF-8");
- }
-
- static String createDatabaseObjJson(Database dbObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(dbObj, "UTF-8");
- }
-
- static String createCatalogObjJson(Catalog catObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(catObj, "UTF-8");
- }
-
- static String createTableObjJson(Table tableObj) throws TException {
- //Note: The parameters of the Table object will be removed in the filter if it matches
- // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
- filterMapkeys(tableObj.getParameters(), paramsFilter);
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(tableObj, "UTF-8");
- }
-
- static String createPartitionObjJson(Partition partitionObj) throws TException {
- //Note: The parameters of the Partition object will be removed in the filter if it matches
- // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS
- filterMapkeys(partitionObj.getParameters(), paramsFilter);
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(partitionObj, "UTF-8");
- }
-
- static String createFunctionObjJson(Function functionObj) throws TException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- return serializer.toString(functionObj, "UTF-8");
- }
-
- public static ObjectNode getJsonTree(NotificationEvent event) throws Exception {
- return getJsonTree(event.getMessage());
- }
-
- public static ObjectNode getJsonTree(String eventMessage) throws Exception {
- JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage);
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(jsonParser, ObjectNode.class);
- }
-
- public static Table getTableObj(ObjectNode jsonTree) throws Exception {
- TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
- Table tableObj = new Table();
- String tableJson = jsonTree.get("tableObjJson").asText();
- deSerializer.deserialize(tableObj, tableJson, "UTF-8");
- return tableObj;
- }
-
- /*
- * TODO: Some thoughts here : We have a current todo to move some of these methods over to
- * MessageFactory instead of being here, so we can override them, but before we move them over,
- * we should keep the following in mind:
- *
- * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when
- * implementing it rather than forcing ourselves down a path wherein returning List is part of
- * our interface, and then people use .size() or somesuch which makes us need to materialize
- * the entire list and not change. Also, returning Iterables allows us to do things like
- * Iterables.transform for some of these.
- * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a
- * couple of things - firstly, that of serialization format, although that is fine for this
- * JSONMessageFactory, and secondly, that makes us just have a number of mappings, one for each
- * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific
- * item belongs in that event message / event itself, as opposed to in the factory. It's okay to
- * have utility accessor methods here that are used by each of the messages to provide accessors.
- * I'm adding a couple of those here.
- *
- */
-
- public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass) throws Exception{
- TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory());
- TBase obj = objClass.newInstance();
- thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8");
- return obj;
- }
-
- public static Iterable<? extends TBase> getTObjs(
- Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception {
-
- try {
- return Iterables.transform(objRefStrs, new com.google.common.base.Function<String,TBase>(){
- @Override
- public TBase apply(@Nullable String objStr){
- try {
- return getTObj(objStr, objClass);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- } catch (RuntimeException re){
- // We have to add this bit of exception handling here, because Function.apply does not allow us to throw
- // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException
- // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns
- // a throwable instead of an Exception, we have to account for the possibility that the underlying code
- // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the
- // RuntimeException is the best thing we can do.
- Throwable t = re.getCause();
- if (t instanceof Exception){
- throw (Exception) t;
- } else {
- throw re;
- }
- }
- }
-
- // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well
- public static Iterable<? extends TBase> getTObjs(
- ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass) throws Exception {
- Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName);
- com.google.common.base.Function<JsonNode,String> textExtractor =
- new com.google.common.base.Function<JsonNode, String>() {
- @Nullable
- @Override
- public String apply(@Nullable JsonNode input) {
- return input.asText();
- }
- };
- return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
new file mode 100644
index 0000000..9c64e33
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+
+
+public class DeSerializer extends JSONMessageDeserializer {
+ private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName());
+
+ private static String deCompress(String messageBody) {
+ byte[] decodedBytes = Base64.getDecoder().decode(messageBody.getBytes(StandardCharsets.UTF_8));
+ try (
+ ByteArrayInputStream in = new ByteArrayInputStream(decodedBytes);
+ GZIPInputStream is = new GZIPInputStream(in)
+ ) {
+ byte[] bytes = IOUtils.toByteArray(is);
+ return new String(bytes, StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ LOG.error("cannot decode the stream", e);
+ LOG.debug("base64 encoded String", messageBody);
+ throw new RuntimeException("cannot decode the stream ", e);
+ }
+ }
+
+ /**
+ * this is mainly as a utility to allow debugging of messages for developers by providing the
+ * message in a file and getting an actual message out.
+ * This class on a deployed hive instance will also be bundled in hive-exec jar.
+ *
+ */
+ public static void main(String[] args) throws IOException {
+ if(args.length != 1) {
+ System.out.println("Usage:");
+ System.out.println("java -cp [classpath] "+DeSerializer.class.getCanonicalName() +" [file_location]");
+ }
+ System.out.print(
+ deCompress(FileUtils.readFileToString(new File(args[0]), StandardCharsets.UTF_8)));
+ }
+
+ @Override
+ public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+ return super.getCreateDatabaseMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AlterDatabaseMessage getAlterDatabaseMessage(String messageBody) {
+ return super.getAlterDatabaseMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+ return super.getDropDatabaseMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public CreateTableMessage getCreateTableMessage(String messageBody) {
+ return super.getCreateTableMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AlterTableMessage getAlterTableMessage(String messageBody) {
+ return super.getAlterTableMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public DropTableMessage getDropTableMessage(String messageBody) {
+ return super.getDropTableMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+ return super.getAddPartitionMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AlterPartitionMessage getAlterPartitionMessage(String messageBody) {
+ return super.getAlterPartitionMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+ return super.getDropPartitionMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public CreateFunctionMessage getCreateFunctionMessage(String messageBody) {
+ return super.getCreateFunctionMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public DropFunctionMessage getDropFunctionMessage(String messageBody) {
+ return super.getDropFunctionMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public InsertMessage getInsertMessage(String messageBody) {
+ return super.getInsertMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) {
+ return super.getAddPrimaryKeyMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) {
+ return super.getAddForeignKeyMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AddUniqueConstraintMessage getAddUniqueConstraintMessage(String messageBody) {
+ return super.getAddUniqueConstraintMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AddNotNullConstraintMessage getAddNotNullConstraintMessage(String messageBody) {
+ return super.getAddNotNullConstraintMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public DropConstraintMessage getDropConstraintMessage(String messageBody) {
+ return super.getDropConstraintMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public OpenTxnMessage getOpenTxnMessage(String messageBody) {
+ return super.getOpenTxnMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public CommitTxnMessage getCommitTxnMessage(String messageBody) {
+ return super.getCommitTxnMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AbortTxnMessage getAbortTxnMessage(String messageBody) {
+ return super.getAbortTxnMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AllocWriteIdMessage getAllocWriteIdMessage(String messageBody) {
+ return super.getAllocWriteIdMessage(deCompress(messageBody));
+ }
+
+ @Override
+ public AcidWriteMessage getAcidWriteMessage(String messageBody) {
+ return super.getAcidWriteMessage(deCompress(messageBody));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
new file mode 100644
index 0000000..07b01a9
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+
+/**
+ * This implementation gzips and then Base64 encodes the message before writing it out.
+ * This MessageEncoder will break the backward compatibility for hive replication v1 which uses webhcat endpoints.
+ */
+public class GzipJSONMessageEncoder implements MessageEncoder {
+ public static final String FORMAT = "gzip(json-2.0)";
+
+ static {
+ MessageFactory.register(FORMAT, GzipJSONMessageEncoder.class);
+ }
+
+ private static DeSerializer deSerializer = new DeSerializer();
+ private static Serializer serializer = new Serializer();
+
+ private static volatile MessageEncoder instance;
+
+ public static MessageEncoder getInstance() {
+ if (instance == null) {
+ synchronized (GzipJSONMessageEncoder.class) {
+ if (instance == null) {
+ instance = new GzipJSONMessageEncoder();
+ }
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public MessageDeserializer getDeserializer() {
+ return deSerializer;
+ }
+
+ @Override
+ public MessageSerializer getSerializer() {
+ return serializer;
+ }
+
+ @Override
+ public String getMessageFormat() {
+ return FORMAT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
new file mode 100644
index 0000000..c40600d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.metastore.messaging.json.gzip;
+
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.zip.GZIPOutputStream;
+
+class Serializer implements MessageSerializer {
+ private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName());
+
+ @Override
+ public String serialize(EventMessage message) {
+ String messageAsString = MessageSerializer.super.serialize(message);
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ GZIPOutputStream gout = new GZIPOutputStream(baos);
+ gout.write(messageAsString.getBytes(StandardCharsets.UTF_8));
+ gout.close();
+ byte[] compressed = baos.toByteArray();
+ return new String(Base64.getEncoder().encode(compressed), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ LOG.error("could not use gzip output stream", e);
+ LOG.debug("message " + messageAsString);
+ throw new RuntimeException("could not use the gzip output Stream", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
index 3d36b60..b01a632 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
public class MetaStoreTestUtils {
+ private static Map<Integer, Thread> map = new HashMap<>();
+
private static final Logger LOG = LoggerFactory.getLogger(MetaStoreTestUtils.class);
private static final String TMP_DIR = System.getProperty("test.tmp.dir");
public static final int RETRY_COUNT = 10;
@@ -75,9 +78,17 @@ public class MetaStoreTestUtils {
}, "MetaStoreThread-" + port);
thread.setDaemon(true);
thread.start();
+ map.put(port,thread);
MetaStoreTestUtils.loopUntilHMSReady(port);
}
+ public static void close(final int port){
+ Thread thread = map.get(port);
+ if(thread != null){
+ thread.stop();
+ }
+ }
+
public static int startMetaStoreWithRetry(final HadoopThriftAuthBridge bridge) throws Exception {
return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, MetastoreConf.newMetastoreConf());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/testutils/ptest2/conf/deployed/master-mr2.properties
----------------------------------------------------------------------
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 23ad0f6..ad5405f 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,7 @@ ut.service.batchSize=8
unitTests.module.itests.hive-unit=itests.hive-unit
ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat
unitTests.module.itests.qtest=itests.qtest
ut.itests.qtest.batchSize=9