You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/07 22:43:24 UTC
[1/2] hive git commit: HIVE-16268 : enable incremental repl dump to
handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 699d6ce36 -> 9d4f13afd
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
deleted file mode 100644
index 29f3b42..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public interface EventHandler {
- void handle(Context withinContext) throws Exception;
-
- long fromEventId();
-
- long toEventId();
-
- DumpType dumpType();
-
- class Context {
- final Path eventRoot, cmRoot;
- final Hive db;
- final HiveConf hiveConf;
- final ReplicationSpec replicationSpec;
-
- public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
- ReplicationSpec replicationSpec) {
- this.eventRoot = eventRoot;
- this.cmRoot = cmRoot;
- this.db = db;
- this.hiveConf = hiveConf;
- this.replicationSpec = replicationSpec;
- }
-
- DumpMetaData createDmd(EventHandler eventHandler) {
- return new DumpMetaData(
- eventRoot,
- eventHandler.dumpType(),
- eventHandler.fromEventId(),
- eventHandler.toEventId(),
- cmRoot, hiveConf
- );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
deleted file mode 100644
index 53adea8..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Modifier;
-import java.util.HashMap;
-import java.util.Map;
-
-public class EventHandlerFactory {
- private EventHandlerFactory() {
- }
-
- private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
-
- static {
- register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
- register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
- register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
- register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
- register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
- register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
- register(MessageFactory.INSERT_EVENT, InsertHandler.class);
- }
-
- static void register(String event, Class<? extends EventHandler> handlerClazz) {
- try {
- Constructor<? extends EventHandler> constructor =
- handlerClazz.getDeclaredConstructor(NotificationEvent.class);
- assert constructor != null;
- assert !Modifier.isPrivate(constructor.getModifiers());
- registeredHandlers.put(event, handlerClazz);
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
- + " does not have the a constructor with only parameter of type:"
- + NotificationEvent.class.getCanonicalName(), e);
- }
- }
-
- public static EventHandler handlerFor(NotificationEvent event) {
- if (registeredHandlers.containsKey(event.getEventType())) {
- Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType());
- try {
- Constructor<? extends EventHandler> constructor =
- handlerClazz.getDeclaredConstructor(NotificationEvent.class);
- return constructor.newInstance(event);
- } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
- // this should never happen. however we want to make sure we propagate the exception
- throw new RuntimeException(
- "failed when creating handler for " + event.getEventType()
- + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
- }
- }
- return new DefaultHandler(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
deleted file mode 100644
index 910b396..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.thrift.TException;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class InsertHandler extends AbstractHandler {
-
- InsertHandler(NotificationEvent event) {
- super(event);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
- org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
- Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
- List<Partition> qlPtns = null;
- if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
- qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
- }
- Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-
- // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
- withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
- EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
- qlMdTable, qlPtns,
- withinContext.replicationSpec);
- Iterable<String> files = insertMsg.getFiles();
-
- if (files != null) {
- Path dataPath;
- if ((null == qlPtns) || qlPtns.isEmpty()) {
- dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
- } else {
- /*
- * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
- * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
- * will have only one entry.
- */
- assert(1 == qlPtns.size());
- dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
- }
-
- // encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
- for (String file : files) {
- fileListWriter.write(file + "\n");
- }
- }
- }
-
- LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
-
- private org.apache.hadoop.hive.ql.metadata.Table tableObject(
- Context withinContext, InsertMessage insertMsg) throws TException {
- return new org.apache.hadoop.hive.ql.metadata.Table(
- withinContext.db.getMSC().getTable(
- insertMsg.getDB(), insertMsg.getTable()
- )
- );
- }
-
- private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
- Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
- FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
- return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_INSERT;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
new file mode 100644
index 0000000..95e51e4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+abstract class AbstractMessageHandler implements MessageHandler {
+ final HashSet<ReadEntity> readEntitySet = new HashSet<>();
+ final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
+ final Map<String, Long> tablesUpdated = new HashMap<>(),
+ databasesUpdated = new HashMap<>();
+ final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+
+ @Override
+ public Set<ReadEntity> readEntities() {
+ return readEntitySet;
+ }
+
+ @Override
+ public Set<WriteEntity> writeEntities() {
+ return writeEntitySet;
+ }
+
+ @Override
+ public Map<String, Long> tablesUpdated() {
+ return tablesUpdated;
+ }
+
+ @Override
+ public Map<String, Long> databasesUpdated() {
+ return databasesUpdated;
+ }
+
+ ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException {
+ String eventId = forContext.dmd.getEventTo().toString();
+ return replicationSpec(eventId, eventId);
+ }
+
+ private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException {
+ return new ReplicationSpec(true, false, fromId, toId, false, true, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
new file mode 100644
index 0000000..6d346b6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+class DefaultHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context withinContext)
+ throws SemanticException {
+ return new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
new file mode 100644
index 0000000..73f2613
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class DropPartitionHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+ try {
+ DropPartitionMessage msg = deserializer.getDropPartitionMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+ Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs =
+ genPartSpecs(new Table(msg.getTableObj()),
+ msg.getPartitions());
+ if (partSpecs.size() > 0) {
+ DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName,
+ partSpecs, null, true, eventOnlyReplicationSpec(context));
+ Task<DDLWork> dropPtnTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc),
+ context.hiveConf
+ );
+ context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
+ dropPtnDesc.getTableName(), msg.getPartitions());
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ return Collections.singletonList(dropPtnTask);
+ } else {
+ throw new SemanticException(
+ "DROP PARTITION EVENT does not return any part descs for event message :"
+ + context.dmd.getPayload());
+ }
+ } catch (Exception e) {
+ throw (e instanceof SemanticException)
+ ? (SemanticException) e
+ : new SemanticException("Error reading message members", e);
+ }
+ }
+
+ private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table,
+ List<Map<String, String>> partitions) throws SemanticException {
+ Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+ int partPrefixLength = 0;
+ if (partitions.size() > 0) {
+ partPrefixLength = partitions.get(0).size();
+ // pick the length of the first ptn, we expect all ptns listed to have the same number of
+ // key-vals.
+ }
+ List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+ for (Map<String, String> ptn : partitions) {
+ // convert each key-value-map to appropriate expression.
+ ExprNodeGenericFuncDesc expr = null;
+ for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+ String key = kvp.getKey();
+ Object val = kvp.getValue();
+ String type = table.getPartColByName(key).getType();
+ PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+ ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+ ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+ "=", column, new ExprNodeConstantDesc(pti, val));
+ expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+ }
+ if (expr != null) {
+ partitionDesc.add(expr);
+ }
+ }
+ if (partitionDesc.size() > 0) {
+ partSpecs.put(partPrefixLength, partitionDesc);
+ }
+ return partSpecs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
new file mode 100644
index 0000000..b623f2f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class DropTableHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+ DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+ DropTableDesc dropTableDesc = new DropTableDesc(
+ actualDbName + "." + actualTblName,
+ null, true, true,
+ eventOnlyReplicationSpec(context));
+ Task<DDLWork> dropTableTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, dropTableDesc),
+ context.hiveConf
+ );
+ context.log
+ .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ return Collections.singletonList(dropTableTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
new file mode 100644
index 0000000..fa63169
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.List;
+
+class InsertHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context withinContext)
+ throws SemanticException {
+ InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload());
+ String actualDbName =
+ withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName;
+ String actualTblName =
+ withinContext.isTableNameEmpty() ? insertMessage.getTable() : withinContext.tableName;
+
+ Context currentContext = new Context(withinContext, actualDbName, actualTblName);
+ // Piggybacking in Import logic for now
+ TableHandler tableHandler = new TableHandler();
+ List<Task<? extends Serializable>> tasks = tableHandler.handle(currentContext);
+ readEntitySet.addAll(tableHandler.readEntities());
+ writeEntitySet.addAll(tableHandler.writeEntities());
+ databasesUpdated.putAll(tableHandler.databasesUpdated);
+ tablesUpdated.putAll(tableHandler.tablesUpdated);
+ return tasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
new file mode 100644
index 0000000..840f95e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+public interface MessageHandler {
+
+ List<Task<? extends Serializable>> handle(Context withinContext) throws SemanticException;
+
+ Set<ReadEntity> readEntities();
+
+ Set<WriteEntity> writeEntities();
+
+ Map<String, Long> tablesUpdated();
+
+ Map<String, Long> databasesUpdated();
+
+ class Context {
+ final String dbName, tableName, location;
+ final Task<? extends Serializable> precursor;
+ DumpMetaData dmd;
+ final HiveConf hiveConf;
+ final Hive db;
+ final org.apache.hadoop.hive.ql.Context nestedContext;
+ final Logger log;
+
+ public Context(String dbName, String tableName, String location,
+ Task<? extends Serializable> precursor, DumpMetaData dmd, HiveConf hiveConf,
+ Hive db, org.apache.hadoop.hive.ql.Context nestedContext, Logger log) {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.location = location;
+ this.precursor = precursor;
+ this.dmd = dmd;
+ this.hiveConf = hiveConf;
+ this.db = db;
+ this.nestedContext = nestedContext;
+ this.log = log;
+ }
+
+ public Context(Context other, String dbName, String tableName) {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.location = other.location;
+ this.precursor = other.precursor;
+ this.dmd = other.dmd;
+ this.hiveConf = other.hiveConf;
+ this.db = other.db;
+ this.nestedContext = other.nestedContext;
+ this.log = other.log;
+ }
+
+ boolean isTableNameEmpty() {
+ return StringUtils.isEmpty(tableName);
+ }
+
+ boolean isDbNameEmpty() {
+ return StringUtils.isEmpty(dbName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
new file mode 100644
index 0000000..de6ff74
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+public class MessageHandlerFactory {
+ private static Map<DumpType, Class<? extends MessageHandler>> messageHandlers = new HashMap<>();
+
+ static {
+ register(DumpType.EVENT_DROP_PARTITION, DropPartitionHandler.class);
+ register(DumpType.EVENT_DROP_TABLE, DropTableHandler.class);
+ register(DumpType.EVENT_INSERT, InsertHandler.class);
+ register(DumpType.EVENT_RENAME_PARTITION, RenamePartitionHandler.class);
+ register(DumpType.EVENT_RENAME_TABLE, RenameTableHandler.class);
+
+ register(DumpType.EVENT_CREATE_TABLE, TableHandler.class);
+ register(DumpType.EVENT_ADD_PARTITION, TableHandler.class);
+ register(DumpType.EVENT_ALTER_TABLE, TableHandler.class);
+ register(DumpType.EVENT_ALTER_PARTITION, TableHandler.class);
+
+ register(DumpType.EVENT_TRUNCATE_PARTITION, TruncatePartitionHandler.class);
+ register(DumpType.EVENT_TRUNCATE_TABLE, TruncateTableHandler.class);
+ }
+
+ private static void register(DumpType eventType, Class<? extends MessageHandler> handlerClazz) {
+ try {
+ Constructor<? extends MessageHandler> constructor =
+ handlerClazz.getDeclaredConstructor();
+ assert constructor != null;
+ assert !Modifier.isPrivate(constructor.getModifiers());
+ messageHandlers.put(eventType, handlerClazz);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
+ + " does not have the a constructor with only parameter of type:"
+ + NotificationEvent.class.getCanonicalName(), e);
+ }
+ }
+
+ public static MessageHandler handlerFor(DumpType eventType) {
+ if (messageHandlers.containsKey(eventType)) {
+ Class<? extends MessageHandler> handlerClazz = messageHandlers.get(eventType);
+ try {
+ Constructor<? extends MessageHandler> constructor =
+ handlerClazz.getDeclaredConstructor();
+ return constructor.newInstance();
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ // this should never happen. however we want to make sure we propagate the exception
+ throw new RuntimeException(
+ "failed when creating handler for " + eventType
+ + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
+ }
+ }
+ return new DefaultHandler();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
new file mode 100644
index 0000000..658f2ba
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class RenamePartitionHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+
+ AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+ Map<String, String> newPartSpec = new LinkedHashMap<>();
+ Map<String, String> oldPartSpec = new LinkedHashMap<>();
+ String tableName = actualDbName + "." + actualTblName;
+ try {
+ Table tblObj = msg.getTableObj();
+ Iterator<String> beforeIterator = msg.getPtnObjBefore().getValuesIterator();
+ Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
+ for (FieldSchema fs : tblObj.getPartitionKeys()) {
+ oldPartSpec.put(fs.getName(), beforeIterator.next());
+ newPartSpec.put(fs.getName(), afterIterator.next());
+ }
+ } catch (Exception e) {
+ throw (e instanceof SemanticException)
+ ? (SemanticException) e
+ : new SemanticException("Error reading message members", e);
+ }
+
+ RenamePartitionDesc renamePtnDesc =
+ new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec);
+ Task<DDLWork> renamePtnTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf
+ );
+ context.log
+ .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec,
+ newPartSpec);
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ tablesUpdated.put(tableName, context.dmd.getEventTo());
+ return Collections.singletonList(renamePtnTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
new file mode 100644
index 0000000..2c429c1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class RenameTableHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+
+ AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload());
+ if (!context.isTableNameEmpty()) {
+ throw new SemanticException(
+ "RENAMES of tables are not supported for table-level replication");
+ }
+ try {
+ String oldDbName = msg.getTableObjBefore().getDbName();
+ String newDbName = msg.getTableObjAfter().getDbName();
+
+ if (!context.isDbNameEmpty()) {
+ // If we're loading into a db, instead of into the warehouse, then the oldDbName and
+ // newDbName must be the same
+ if (!oldDbName.equalsIgnoreCase(newDbName)) {
+ throw new SemanticException("Cannot replicate an event renaming a table across"
+ + " databases into a db level load " + oldDbName + "->" + newDbName);
+ } else {
+ // both were the same, and can be replaced by the new db we're loading into.
+ oldDbName = context.dbName;
+ newDbName = context.dbName;
+ }
+ }
+
+ String oldName = oldDbName + "." + msg.getTableObjBefore().getTableName();
+ String newName = newDbName + "." + msg.getTableObjAfter().getTableName();
+ AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false);
+ Task<DDLWork> renameTableTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf
+ );
+ context.log.debug(
+ "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName
+ );
+ // oldDbName and newDbName *will* be the same if we're here
+ databasesUpdated.put(newDbName, context.dmd.getEventTo());
+ tablesUpdated.remove(oldName);
+ tablesUpdated.put(newName, context.dmd.getEventTo());
+ // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated
+ // However, we explicitly don't support repl of that sort, and error out above if so. If that should
+ // ever change, this will need reworking.
+ return Collections.singletonList(renameTableTask);
+ } catch (Exception e) {
+ throw (e instanceof SemanticException)
+ ? (SemanticException) e
+ : new SemanticException("Error reading message members", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
new file mode 100644
index 0000000..2db8385
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+class TableHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+ // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+ // If tblName is null, then we default to the table name specified in _metadata, which is good.
+ // or are both specified, in which case, that's what we are intended to create the new table as.
+ if (context.isDbNameEmpty()) {
+ throw new SemanticException("Database name cannot be null for a table load");
+ }
+ try {
+ // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here
+
+ // no location set on repl loads
+ boolean isLocationSet = false;
+ // all repl imports are non-external
+ boolean isExternalSet = false;
+ // bootstrap loads are not partition level
+ boolean isPartSpecSet = false;
+ // repl loads are not partition level
+ LinkedHashMap<String, String> parsedPartSpec = null;
+ // no location for repl imports
+ String parsedLocation = null;
+ List<Task<? extends Serializable>> importTasks = new ArrayList<>();
+
+ EximUtil.SemanticAnalyzerWrapperContext x =
+ new EximUtil.SemanticAnalyzerWrapperContext(
+ context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log,
+ context.nestedContext);
+ ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+ (context.precursor != null), parsedLocation, context.tableName, context.dbName,
+ parsedPartSpec, context.location, x,
+ databasesUpdated, tablesUpdated);
+
+ return importTasks;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
new file mode 100644
index 0000000..5436f0d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class TruncatePartitionHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+ AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+ Map<String, String> partSpec = new LinkedHashMap<>();
+ try {
+ org.apache.hadoop.hive.metastore.api.Table tblObj = msg.getTableObj();
+ Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
+ for (FieldSchema fs : tblObj.getPartitionKeys()) {
+ partSpec.put(fs.getName(), afterIterator.next());
+ }
+ } catch (Exception e) {
+ if (!(e instanceof SemanticException)) {
+ throw new SemanticException("Error reading message members", e);
+ } else {
+ throw (SemanticException) e;
+ }
+ }
+
+ TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
+ actualDbName + "." + actualTblName, partSpec);
+ Task<DDLWork> truncatePtnTask =
+ TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc),
+ context.hiveConf
+ );
+ context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(),
+ truncateTableDesc.getTableName());
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ return Collections.singletonList(truncatePtnTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
new file mode 100644
index 0000000..731383c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class TruncateTableHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+ AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+ TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
+ actualDbName + "." + actualTblName, null);
+ Task<DDLWork> truncateTableTask = TaskFactory.get(
+ new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc),
+ context.hiveConf
+ );
+
+ context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(),
+ truncateTableDesc.getTableName());
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ return Collections.singletonList(truncateTableTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
new file mode 100644
index 0000000..c689e6f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventHandlerFactory {
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() {
+ class NonCompatibleEventHandler implements EventHandler {
+ @Override
+ public void handle(Context withinContext) throws Exception {
+
+ }
+
+ @Override
+ public long fromEventId() {
+ return 0;
+ }
+
+ @Override
+ public long toEventId() {
+ return 0;
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return null;
+ }
+ }
+ EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class);
+ }
+
+ @Test
+ public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
+ EventHandler eventHandler =
+ EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
+ "shouldGiveDefaultHandler", "s"));
+ assertTrue(eventHandler instanceof DefaultHandler);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
deleted file mode 100644
index 4b802c4..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestEventHandlerFactory {
- @Test(expected = IllegalArgumentException.class)
- public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() {
- class NonCompatibleEventHandler implements EventHandler {
- @Override
- public void handle(Context withinContext) throws Exception {
-
- }
-
- @Override
- public long fromEventId() {
- return 0;
- }
-
- @Override
- public long toEventId() {
- return 0;
- }
-
- @Override
- public DumpType dumpType() {
- return null;
- }
- }
- EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class);
- }
-
- @Test
- public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
- EventHandler eventHandler =
- EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
- "shouldGiveDefaultHandler", "s"));
- assertTrue(eventHandler instanceof DefaultHandler);
- }
-
-}
\ No newline at end of file
[2/2] hive git commit: HIVE-16268 : enable incremental repl dump to
handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Posted by kh...@apache.org.
HIVE-16268 : enable incremental repl dump to handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d4f13af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d4f13af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d4f13af
Branch: refs/heads/master
Commit: 9d4f13afda34250b7cf722287a557426e90ff24d
Parents: 699d6ce
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Thu May 4 02:48:27 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Sun May 7 15:43:21 2017 -0700
----------------------------------------------------------------------
.../ql/parse/ReplicationSemanticAnalyzer.java | 306 ++-----------------
.../hadoop/hive/ql/parse/repl/DumpType.java | 1 +
.../parse/repl/dump/events/AbstractHandler.java | 46 +++
.../repl/dump/events/AddPartitionHandler.java | 114 +++++++
.../repl/dump/events/AlterPartitionHandler.java | 112 +++++++
.../repl/dump/events/AlterTableHandler.java | 102 +++++++
.../repl/dump/events/CreateFunctionHandler.java | 36 +++
.../repl/dump/events/CreateTableHandler.java | 86 ++++++
.../parse/repl/dump/events/DefaultHandler.java | 44 +++
.../repl/dump/events/DropPartitionHandler.java | 44 +++
.../repl/dump/events/DropTableHandler.java | 44 +++
.../ql/parse/repl/dump/events/EventHandler.java | 62 ++++
.../repl/dump/events/EventHandlerFactory.java | 76 +++++
.../parse/repl/dump/events/InsertHandler.java | 110 +++++++
.../ql/parse/repl/events/AbstractHandler.java | 46 ---
.../parse/repl/events/AddPartitionHandler.java | 114 -------
.../repl/events/AlterPartitionHandler.java | 112 -------
.../ql/parse/repl/events/AlterTableHandler.java | 102 -------
.../parse/repl/events/CreateTableHandler.java | 86 ------
.../ql/parse/repl/events/DefaultHandler.java | 44 ---
.../parse/repl/events/DropPartitionHandler.java | 44 ---
.../ql/parse/repl/events/DropTableHandler.java | 44 ---
.../hive/ql/parse/repl/events/EventHandler.java | 62 ----
.../parse/repl/events/EventHandlerFactory.java | 75 -----
.../ql/parse/repl/events/InsertHandler.java | 110 -------
.../load/message/AbstractMessageHandler.java | 67 ++++
.../parse/repl/load/message/DefaultHandler.java | 33 ++
.../repl/load/message/DropPartitionHandler.java | 108 +++++++
.../repl/load/message/DropTableHandler.java | 51 ++++
.../parse/repl/load/message/InsertHandler.java | 47 +++
.../parse/repl/load/message/MessageHandler.java | 91 ++++++
.../load/message/MessageHandlerFactory.java | 79 +++++
.../load/message/RenamePartitionHandler.java | 74 +++++
.../repl/load/message/RenameTableHandler.java | 81 +++++
.../parse/repl/load/message/TableHandler.java | 68 +++++
.../load/message/TruncatePartitionHandler.java | 69 +++++
.../repl/load/message/TruncateTableHandler.java | 50 +++
.../dump/events/TestEventHandlerFactory.java | 62 ++++
.../repl/events/TestEventHandlerFactory.java | 62 ----
39 files changed, 1781 insertions(+), 1183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 2daa123..5d1d2fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,22 +27,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -53,29 +46,23 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler;
-import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandlerFactory;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.DropTableDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FunctionWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
-import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +74,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -681,270 +667,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private List<Task<? extends Serializable>> analyzeEventLoad(
- String dbName, String tblName, String locn,
- Task<? extends Serializable> precursor,
- Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated,
- DumpMetaData dmd) throws SemanticException {
- MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
- switch (dmd.getDumpType()) {
- case EVENT_CREATE_TABLE: {
- return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
- }
- case EVENT_ADD_PARTITION: {
- return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
- }
- case EVENT_DROP_TABLE: {
- DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropTableMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropTableMessage.getTable() : tblName);
- DropTableDesc dropTableDesc = new DropTableDesc(
- actualDbName + "." + actualTblName,
- null, true, true,
- getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
- Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf);
- if (precursor != null){
- precursor.addDependentTask(dropTableTask);
- }
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(dropTableTask);
- LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
- dbsUpdated.put(actualDbName,dmd.getEventTo());
- return tasks;
- }
- case EVENT_DROP_PARTITION: {
- try {
- DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropPartitionMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropPartitionMessage.getTable() : tblName);
- Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs;
- partSpecs =
- genPartSpecs(new Table(dropPartitionMessage.getTableObj()),
- dropPartitionMessage.getPartitions());
- if (partSpecs.size() > 0) {
- DropTableDesc dropPtnDesc = new DropTableDesc(
- actualDbName + "." + actualTblName,
- partSpecs, null, true,
- getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
- Task<DDLWork> dropPtnTask =
- TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf);
- if (precursor != null) {
- precursor.addDependentTask(dropPtnTask);
- }
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(dropPtnTask);
- LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
- dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions());
- dbsUpdated.put(actualDbName, dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, dmd.getEventTo());
- return tasks;
- } else {
- throw new SemanticException(
- "DROP PARTITION EVENT does not return any part descs for event message :"
- + dmd.getPayload());
- }
- } catch (Exception e) {
- if (!(e instanceof SemanticException)){
- throw new SemanticException("Error reading message members", e);
- } else {
- throw (SemanticException)e;
- }
- }
- }
- case EVENT_ALTER_TABLE: {
- return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
- }
- case EVENT_RENAME_TABLE: {
- AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload());
- if ((tblName != null) && (!tblName.isEmpty())){
- throw new SemanticException("RENAMES of tables are not supported for table-level replication");
- }
- try {
- String oldDbName = renameTableMessage.getTableObjBefore().getDbName();
- String newDbName = renameTableMessage.getTableObjAfter().getDbName();
-
- if ((dbName != null) && (!dbName.isEmpty())){
- // If we're loading into a db, instead of into the warehouse, then the oldDbName and
- // newDbName must be the same
- if (!oldDbName.equalsIgnoreCase(newDbName)){
- throw new SemanticException("Cannot replicate an event renaming a table across"
- + " databases into a db level load " + oldDbName +"->" + newDbName);
- } else {
- // both were the same, and can be replaced by the new db we're loading into.
- oldDbName = dbName;
- newDbName = dbName;
- }
- }
-
- String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName();
- String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName();
- AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false);
- Task<DDLWork> renameTableTask = TaskFactory.get(new DDLWork(inputs, outputs, renameTableDesc), conf);
- if (precursor != null){
- precursor.addDependentTask(renameTableTask);
- }
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(renameTableTask);
- LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName);
- dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and newDbName *will* be the same if we're here
- tablesUpdated.remove(oldName);
- tablesUpdated.put(newName, dmd.getEventTo());
- // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated
- // However, we explicitly don't support repl of that sort, and error out above if so. If that should
- // ever change, this will need reworking.
- return tasks;
- } catch (Exception e) {
- if (!(e instanceof SemanticException)){
- throw new SemanticException("Error reading message members", e);
- } else {
- throw (SemanticException)e;
- }
- }
- }
- case EVENT_TRUNCATE_TABLE: {
- AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName);
-
- TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
- actualDbName + "." + actualTblName, null);
- Task<DDLWork> truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
- if (precursor != null) {
- precursor.addDependentTask(truncateTableTask);
- }
-
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(truncateTableTask);
- LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName());
- dbsUpdated.put(actualDbName,dmd.getEventTo());
- return tasks;
- }
- case EVENT_ALTER_PARTITION: {
- return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
- }
- case EVENT_RENAME_PARTITION: {
- AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? renamePtnMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? renamePtnMessage.getTable() : tblName);
-
- Map<String, String> newPartSpec = new LinkedHashMap<String,String>();
- Map<String, String> oldPartSpec = new LinkedHashMap<String,String>();
- String tableName = actualDbName + "." + actualTblName;
- try {
- org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj();
- org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore();
- org.apache.hadoop.hive.metastore.api.Partition pobjAfter = renamePtnMessage.getPtnObjAfter();
- Iterator<String> beforeValIter = pobjBefore.getValuesIterator();
- Iterator<String> afterValIter = pobjAfter.getValuesIterator();
- for (FieldSchema fs : tblObj.getPartitionKeys()){
- oldPartSpec.put(fs.getName(), beforeValIter.next());
- newPartSpec.put(fs.getName(), afterValIter.next());
- }
- } catch (Exception e) {
- if (!(e instanceof SemanticException)){
- throw new SemanticException("Error reading message members", e);
- } else {
- throw (SemanticException)e;
- }
- }
-
- RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec);
- Task<DDLWork> renamePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, renamePtnDesc), conf);
- if (precursor != null){
- precursor.addDependentTask(renamePtnTask);
- }
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(renamePtnTask);
- LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec);
- dbsUpdated.put(actualDbName, dmd.getEventTo());
- tablesUpdated.put(tableName, dmd.getEventTo());
- return tasks;
- }
- case EVENT_TRUNCATE_PARTITION: {
- AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName);
-
- Map<String, String> partSpec = new LinkedHashMap<String,String>();
- try {
- org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj();
- org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter();
- Iterator<String> afterValIter = pobjAfter.getValuesIterator();
- for (FieldSchema fs : tblObj.getPartitionKeys()){
- partSpec.put(fs.getName(), afterValIter.next());
- }
- } catch (Exception e) {
- if (!(e instanceof SemanticException)){
- throw new SemanticException("Error reading message members", e);
- } else {
- throw (SemanticException)e;
- }
- }
-
- TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
- actualDbName + "." + actualTblName, partSpec);
- Task<DDLWork> truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
- if (precursor != null) {
- precursor.addDependentTask(truncatePtnTask);
- }
-
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- tasks.add(truncatePtnTask);
- LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName());
- dbsUpdated.put(actualDbName,dmd.getEventTo());
- return tasks;
- }
- case EVENT_INSERT: {
- md = MessageFactory.getInstance().getDeserializer();
- InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
- String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName);
- String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName);
-
- // Piggybacking in Import logic for now
- return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated);
- }
- case EVENT_UNKNOWN: {
- break;
- }
- default: {
- break;
- }
- }
- return null;
- }
-
- private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table,
- List<Map<String, String>> partitions) throws SemanticException {
- Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs =
- new HashMap<Integer, List<ExprNodeGenericFuncDesc>>();
- int partPrefixLength = 0;
- if ((partitions != null) && (partitions.size() > 0)) {
- partPrefixLength = partitions.get(0).size();
- // pick the length of the first ptn, we expect all ptns listed to have the same number of
- // key-vals.
- }
- List<ExprNodeGenericFuncDesc> ptnDescs = new ArrayList<ExprNodeGenericFuncDesc>();
- for (Map<String, String> ptn : partitions) {
- // convert each key-value-map to appropriate expression.
- ExprNodeGenericFuncDesc expr = null;
- for (Map.Entry<String, String> kvp : ptn.entrySet()) {
- String key = kvp.getKey();
- Object val = kvp.getValue();
- String type = table.getPartColByName(key).getType();
- ;
- PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
- ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
- ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
- "=", column, new ExprNodeConstantDesc(pti, val));
- expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
- }
- if (expr != null) {
- ptnDescs.add(expr);
+ String dbName, String tblName, String locn, Task<? extends Serializable> precursor,
+ Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, DumpMetaData dmd)
+ throws SemanticException {
+ MessageHandler.Context context =
+ new MessageHandler.Context(dbName, tblName, locn, precursor, dmd, conf, db, ctx, LOG);
+ MessageHandler messageHandler = MessageHandlerFactory.handlerFor(dmd.getDumpType());
+ List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
+
+ if (precursor != null) {
+ for (Task<? extends Serializable> t : tasks) {
+ precursor.addDependentTask(t);
+ LOG.debug("Added {}:{} as a precursor of {}:{}",
+ precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
}
}
- if (ptnDescs.size() > 0) {
- partSpecs.put(partPrefixLength, ptnDescs);
- }
- return partSpecs;
+ dbsUpdated.putAll(messageHandler.databasesUpdated());
+ tablesUpdated.putAll(messageHandler.tablesUpdated());
+ inputs.addAll(messageHandler.readEntities());
+ outputs.addAll(messageHandler.writeEntities());
+ return tasks;
}
private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index b1df5a3..c2cffaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -31,6 +31,7 @@ public enum DumpType {
EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
EVENT_INSERT("EVENT_INSERT"),
+ EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION"),
EVENT_UNKNOWN("EVENT_UNKNOWN");
String type = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
new file mode 100644
index 0000000..ba699e3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractHandler implements EventHandler {
+ static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);
+
+ final NotificationEvent event;
+ final MessageDeserializer deserializer;
+
+ AbstractHandler(NotificationEvent event) {
+ this.event = event;
+ deserializer = MessageFactory.getInstance().getDeserializer();
+ }
+
+ @Override
+ public long fromEventId() {
+ return event.getEventId();
+ }
+
+ @Override
+ public long toEventId() {
+ return event.getEventId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
new file mode 100644
index 0000000..f4239e5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import javax.annotation.Nullable;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class AddPartitionHandler extends AbstractHandler {
+ protected AddPartitionHandler(NotificationEvent notificationEvent) {
+ super(notificationEvent);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
+ LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
+ Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs();
+ if ((ptns == null) || (!ptns.iterator().hasNext())) {
+ LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
+ return;
+ }
+ org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
+ if (tobj == null) {
+ LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
+ return;
+ }
+
+ final Table qlMdTable = new Table(tobj);
+ Iterable<Partition> qlPtns = Iterables.transform(
+ ptns,
+ new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
+ @Nullable
+ @Override
+ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ return new Partition(qlMdTable, input);
+ } catch (HiveException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+ );
+
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(withinContext.hiveConf),
+ metaDataPath,
+ qlMdTable,
+ qlPtns,
+ withinContext.replicationSpec);
+
+ Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator();
+ for (Partition qlPtn : qlPtns) {
+ Iterable<String> files = partitionFilesIter.next().getFiles();
+ if (files != null) {
+ // encoded filename/checksum of files, write into _files
+ try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
+ for (String file : files) {
+ fileListWriter.write(file + "\n");
+ }
+ }
+ }
+ }
+ withinContext.createDmd(this).write();
+ }
+
+ private BufferedWriter writer(Context withinContext, Partition qlPtn)
+ throws IOException {
+ Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
+ FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
+ Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
+ return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_ADD_PARTITION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
new file mode 100644
index 0000000..8a7e742
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AlterPartitionHandler extends AbstractHandler {
+ private final org.apache.hadoop.hive.metastore.api.Partition after;
+ private final org.apache.hadoop.hive.metastore.api.Table tableObject;
+ private final boolean isTruncateOp;
+ private final Scenario scenario;
+
+ AlterPartitionHandler(NotificationEvent event) throws Exception {
+ super(event);
+ AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
+ tableObject = apm.getTableObj();
+ org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
+ after = apm.getPtnObjAfter();
+ isTruncateOp = apm.getIsTruncateOp();
+ scenario = scenarioType(before, after);
+ }
+
+ private enum Scenario {
+ ALTER {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_ALTER_PARTITION;
+ }
+ },
+ RENAME {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_RENAME_PARTITION;
+ }
+ },
+ TRUNCATE {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_TRUNCATE_PARTITION;
+ }
+ };
+
+ abstract DumpType dumpType();
+ }
+
+ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
+ org.apache.hadoop.hive.metastore.api.Partition after) {
+ Iterator<String> beforeValIter = before.getValuesIterator();
+ Iterator<String> afterValIter = after.getValuesIterator();
+ while(beforeValIter.hasNext()) {
+ if (!beforeValIter.next().equals(afterValIter.next())) {
+ return Scenario.RENAME;
+ }
+ }
+ return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
+
+ if (Scenario.ALTER == scenario) {
+ withinContext.replicationSpec.setIsMetadataOnly(true);
+ Table qlMdTable = new Table(tableObject);
+ List<Partition> partitions = new ArrayList<>();
+ partitions.add(new Partition(qlMdTable, after));
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(withinContext.hiveConf),
+ metaDataPath,
+ qlMdTable,
+ partitions,
+ withinContext.replicationSpec);
+ }
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return scenario.dumpType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
new file mode 100644
index 0000000..f457f23
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AlterTableHandler extends AbstractHandler {
+ private final org.apache.hadoop.hive.metastore.api.Table after;
+ private final boolean isTruncateOp;
+ private final Scenario scenario;
+
+ private enum Scenario {
+ ALTER {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_ALTER_TABLE;
+ }
+ },
+ RENAME {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_RENAME_TABLE;
+ }
+ },
+ TRUNCATE {
+ @Override
+ DumpType dumpType() {
+ return DumpType.EVENT_TRUNCATE_TABLE;
+ }
+ };
+
+ abstract DumpType dumpType();
+ }
+
+ AlterTableHandler(NotificationEvent event) throws Exception {
+ super(event);
+ AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
+ org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
+ after = atm.getTableObjAfter();
+ isTruncateOp = atm.getIsTruncateOp();
+ scenario = scenarioType(before, after);
+ }
+
+ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
+ org.apache.hadoop.hive.metastore.api.Table after) {
+ if (before.getDbName().equals(after.getDbName())
+ && before.getTableName().equals(after.getTableName())) {
+ return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
+ } else {
+ return Scenario.RENAME;
+ }
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ {
+ LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
+ if (Scenario.ALTER == scenario) {
+ withinContext.replicationSpec.setIsMetadataOnly(true);
+ Table qlMdTableAfter = new Table(after);
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(withinContext.hiveConf),
+ metaDataPath,
+ qlMdTableAfter,
+ null,
+ withinContext.replicationSpec);
+ }
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return scenario.dumpType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
new file mode 100644
index 0000000..bebf035
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class CreateFunctionHandler extends AbstractHandler {
+ CreateFunctionHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ CreateFunctionMessage createFunctionMessage =
+ deserializer.getCreateFunctionMessage(event.getMessage());
+ LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage());
+ Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
+
+ try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
+ new FunctionSerializer(createFunctionMessage.getFunctionObj())
+ .writeTo(jsonWriter, withinContext.replicationSpec);
+ }
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_CREATE_FUNCTION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
new file mode 100644
index 0000000..ca3607f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class CreateTableHandler extends AbstractHandler {
+
+ CreateTableHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
+ LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
+ org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
+
+ if (tobj == null) {
+ LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
+ return;
+ }
+
+ Table qlMdTable = new Table(tobj);
+ if (qlMdTable.isView()) {
+ withinContext.replicationSpec.setIsMetadataOnly(true);
+ }
+
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(withinContext.hiveConf),
+ metaDataPath,
+ qlMdTable,
+ null,
+ withinContext.replicationSpec);
+
+ Path dataPath = new Path(withinContext.eventRoot, "data");
+ Iterable<String> files = ctm.getFiles();
+ if (files != null) {
+ // encoded filename/checksum of files, write into _files
+ try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+ for (String file : files) {
+ fileListWriter.write(file + "\n");
+ }
+ }
+ }
+ withinContext.createDmd(this).write();
+ }
+
+ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+ FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+ Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+ return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_CREATE_TABLE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
new file mode 100644
index 0000000..0d4665a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DefaultHandler extends AbstractHandler {
+
+ DefaultHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
new file mode 100644
index 0000000..a4eacc4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropPartitionHandler extends AbstractHandler {
+
+ DropPartitionHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_DROP_PARTITION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
new file mode 100644
index 0000000..40cd5cb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropTableHandler extends AbstractHandler {
+
+ DropTableHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_DROP_TABLE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
new file mode 100644
index 0000000..c0fa7b2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+public interface EventHandler {
+ void handle(Context withinContext) throws Exception;
+
+ long fromEventId();
+
+ long toEventId();
+
+ DumpType dumpType();
+
+ class Context {
+ final Path eventRoot, cmRoot;
+ final Hive db;
+ final HiveConf hiveConf;
+ final ReplicationSpec replicationSpec;
+
+ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
+ ReplicationSpec replicationSpec) {
+ this.eventRoot = eventRoot;
+ this.cmRoot = cmRoot;
+ this.db = db;
+ this.hiveConf = hiveConf;
+ this.replicationSpec = replicationSpec;
+ }
+
+ DumpMetaData createDmd(EventHandler eventHandler) {
+ return new DumpMetaData(
+ eventRoot,
+ eventHandler.dumpType(),
+ eventHandler.fromEventId(),
+ eventHandler.toEventId(),
+ cmRoot, hiveConf
+ );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
new file mode 100644
index 0000000..08dbd13
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventHandlerFactory {
+ private EventHandlerFactory() {
+ }
+
+ private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
+
+ static {
+ register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
+ register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
+ register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
+ register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
+ register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
+ register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
+ register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
+ register(MessageFactory.INSERT_EVENT, InsertHandler.class);
+ }
+
+ static void register(String event, Class<? extends EventHandler> handlerClazz) {
+ try {
+ Constructor<? extends EventHandler> constructor =
+ handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+ assert constructor != null;
+ assert !Modifier.isPrivate(constructor.getModifiers());
+ registeredHandlers.put(event, handlerClazz);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
+ + " does not have the a constructor with only parameter of type:"
+ + NotificationEvent.class.getCanonicalName(), e);
+ }
+ }
+
+ public static EventHandler handlerFor(NotificationEvent event) {
+ if (registeredHandlers.containsKey(event.getEventType())) {
+ Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType());
+ try {
+ Constructor<? extends EventHandler> constructor =
+ handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+ return constructor.newInstance(event);
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ // this should never happen. however we want to make sure we propagate the exception
+ throw new RuntimeException(
+ "failed when creating handler for " + event.getEventType()
+ + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
+ }
+ }
+ return new DefaultHandler(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
new file mode 100644
index 0000000..0393701
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.thrift.TException;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class InsertHandler extends AbstractHandler {
+
+ InsertHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
+ Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+ List<Partition> qlPtns = null;
+ if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
+ qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
+ }
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+
+ // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
+ withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
+ EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
+ qlMdTable, qlPtns,
+ withinContext.replicationSpec);
+ Iterable<String> files = insertMsg.getFiles();
+
+ if (files != null) {
+ Path dataPath;
+ if ((null == qlPtns) || qlPtns.isEmpty()) {
+ dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+ } else {
+ /*
+ * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
+ * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
+ * will have only one entry.
+ */
+ assert(1 == qlPtns.size());
+ dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
+ }
+
+ // encoded filename/checksum of files, write into _files
+ try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+ for (String file : files) {
+ fileListWriter.write(file + "\n");
+ }
+ }
+ }
+
+ LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ private org.apache.hadoop.hive.ql.metadata.Table tableObject(
+ Context withinContext, InsertMessage insertMsg) throws TException {
+ return new org.apache.hadoop.hive.ql.metadata.Table(
+ withinContext.db.getMSC().getTable(
+ insertMsg.getDB(), insertMsg.getTable()
+ )
+ );
+ }
+
+ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+ Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+ FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+ return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_INSERT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
deleted file mode 100644
index ab059c2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractHandler implements EventHandler {
- static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);
-
- final NotificationEvent event;
- final MessageDeserializer deserializer;
-
- AbstractHandler(NotificationEvent event) {
- this.event = event;
- deserializer = MessageFactory.getInstance().getDeserializer();
- }
-
- @Override
- public long fromEventId() {
- return event.getEventId();
- }
-
- @Override
- public long toEventId() {
- return event.getEventId();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
deleted file mode 100644
index 1616ab9..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import javax.annotation.Nullable;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Iterator;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public class AddPartitionHandler extends AbstractHandler {
- protected AddPartitionHandler(NotificationEvent notificationEvent) {
- super(notificationEvent);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
- LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
- Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs();
- if ((ptns == null) || (!ptns.iterator().hasNext())) {
- LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
- return;
- }
- org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
- if (tobj == null) {
- LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
- return;
- }
-
- final Table qlMdTable = new Table(tobj);
- Iterable<Partition> qlPtns = Iterables.transform(
- ptns,
- new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
- @Nullable
- @Override
- public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
- if (input == null) {
- return null;
- }
- try {
- return new Partition(qlMdTable, input);
- } catch (HiveException e) {
- throw new IllegalArgumentException(e);
- }
- }
- }
- );
-
- Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
- EximUtil.createExportDump(
- metaDataPath.getFileSystem(withinContext.hiveConf),
- metaDataPath,
- qlMdTable,
- qlPtns,
- withinContext.replicationSpec);
-
- Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator();
- for (Partition qlPtn : qlPtns) {
- Iterable<String> files = partitionFilesIter.next().getFiles();
- if (files != null) {
- // encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
- for (String file : files) {
- fileListWriter.write(file + "\n");
- }
- }
- }
- }
- withinContext.createDmd(this).write();
- }
-
- private BufferedWriter writer(Context withinContext, Partition qlPtn)
- throws IOException {
- Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
- FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
- Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
- return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_ADD_PARTITION;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
deleted file mode 100644
index b6c3496..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class AlterPartitionHandler extends AbstractHandler {
- private final org.apache.hadoop.hive.metastore.api.Partition after;
- private final org.apache.hadoop.hive.metastore.api.Table tableObject;
- private final boolean isTruncateOp;
- private final Scenario scenario;
-
- AlterPartitionHandler(NotificationEvent event) throws Exception {
- super(event);
- AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
- tableObject = apm.getTableObj();
- org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
- after = apm.getPtnObjAfter();
- isTruncateOp = apm.getIsTruncateOp();
- scenario = scenarioType(before, after);
- }
-
- private enum Scenario {
- ALTER {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_ALTER_PARTITION;
- }
- },
- RENAME {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_RENAME_PARTITION;
- }
- },
- TRUNCATE {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_TRUNCATE_PARTITION;
- }
- };
-
- abstract DumpType dumpType();
- }
-
- private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
- org.apache.hadoop.hive.metastore.api.Partition after) {
- Iterator<String> beforeValIter = before.getValuesIterator();
- Iterator<String> afterValIter = after.getValuesIterator();
- while(beforeValIter.hasNext()) {
- if (!beforeValIter.next().equals(afterValIter.next())) {
- return Scenario.RENAME;
- }
- }
- return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
-
- if (Scenario.ALTER == scenario) {
- withinContext.replicationSpec.setIsMetadataOnly(true);
- Table qlMdTable = new Table(tableObject);
- List<Partition> partitions = new ArrayList<>();
- partitions.add(new Partition(qlMdTable, after));
- Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
- EximUtil.createExportDump(
- metaDataPath.getFileSystem(withinContext.hiveConf),
- metaDataPath,
- qlMdTable,
- partitions,
- withinContext.replicationSpec);
- }
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
-
- @Override
- public DumpType dumpType() {
- return scenario.dumpType();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
deleted file mode 100644
index d553240..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class AlterTableHandler extends AbstractHandler {
- private final org.apache.hadoop.hive.metastore.api.Table after;
- private final boolean isTruncateOp;
- private final Scenario scenario;
-
- private enum Scenario {
- ALTER {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_ALTER_TABLE;
- }
- },
- RENAME {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_RENAME_TABLE;
- }
- },
- TRUNCATE {
- @Override
- DumpType dumpType() {
- return DumpType.EVENT_TRUNCATE_TABLE;
- }
- };
-
- abstract DumpType dumpType();
- }
-
- AlterTableHandler(NotificationEvent event) throws Exception {
- super(event);
- AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
- org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
- after = atm.getTableObjAfter();
- isTruncateOp = atm.getIsTruncateOp();
- scenario = scenarioType(before, after);
- }
-
- private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
- org.apache.hadoop.hive.metastore.api.Table after) {
- if (before.getDbName().equals(after.getDbName())
- && before.getTableName().equals(after.getTableName())) {
- return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
- } else {
- return Scenario.RENAME;
- }
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- {
- LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
- if (Scenario.ALTER == scenario) {
- withinContext.replicationSpec.setIsMetadataOnly(true);
- Table qlMdTableAfter = new Table(after);
- Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
- EximUtil.createExportDump(
- metaDataPath.getFileSystem(withinContext.hiveConf),
- metaDataPath,
- qlMdTableAfter,
- null,
- withinContext.replicationSpec);
- }
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
- }
-
- @Override
- public DumpType dumpType() {
- return scenario.dumpType();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
deleted file mode 100644
index 88600fd..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public class CreateTableHandler extends AbstractHandler {
-
- CreateTableHandler(NotificationEvent event) {
- super(event);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
- LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
- org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
-
- if (tobj == null) {
- LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
- return;
- }
-
- Table qlMdTable = new Table(tobj);
- if (qlMdTable.isView()) {
- withinContext.replicationSpec.setIsMetadataOnly(true);
- }
-
- Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
- EximUtil.createExportDump(
- metaDataPath.getFileSystem(withinContext.hiveConf),
- metaDataPath,
- qlMdTable,
- null,
- withinContext.replicationSpec);
-
- Path dataPath = new Path(withinContext.eventRoot, "data");
- Iterable<String> files = ctm.getFiles();
- if (files != null) {
- // encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
- for (String file : files) {
- fileListWriter.write(file + "\n");
- }
- }
- }
- withinContext.createDmd(this).write();
- }
-
- private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
- FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
- Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
- return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_CREATE_TABLE;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
deleted file mode 100644
index 78cd74f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DefaultHandler extends AbstractHandler {
-
- DefaultHandler(NotificationEvent event) {
- super(event);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_UNKNOWN;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
deleted file mode 100644
index c4a0908..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DropPartitionHandler extends AbstractHandler {
-
- DropPartitionHandler(NotificationEvent event) {
- super(event);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_DROP_PARTITION;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
deleted file mode 100644
index e3addaf..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DropTableHandler extends AbstractHandler {
-
- DropTableHandler(NotificationEvent event) {
- super(event);
- }
-
- @Override
- public void handle(Context withinContext) throws Exception {
- LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
- DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
- dmd.write();
- }
-
- @Override
- public DumpType dumpType() {
- return DumpType.EVENT_DROP_TABLE;
- }
-}