You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/07 22:43:24 UTC

[1/2] hive git commit: HIVE-16268 : enable incremental repl dump to handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master 699d6ce36 -> 9d4f13afd


http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
deleted file mode 100644
index 29f3b42..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public interface EventHandler {
-  void handle(Context withinContext) throws Exception;
-
-  long fromEventId();
-
-  long toEventId();
-
-  DumpType dumpType();
-
-  class Context {
-    final Path eventRoot, cmRoot;
-    final Hive db;
-    final HiveConf hiveConf;
-    final ReplicationSpec replicationSpec;
-
-    public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
-        ReplicationSpec replicationSpec) {
-      this.eventRoot = eventRoot;
-      this.cmRoot = cmRoot;
-      this.db = db;
-      this.hiveConf = hiveConf;
-      this.replicationSpec = replicationSpec;
-    }
-
-    DumpMetaData createDmd(EventHandler eventHandler) {
-      return new DumpMetaData(
-          eventRoot,
-          eventHandler.dumpType(),
-          eventHandler.fromEventId(),
-          eventHandler.toEventId(),
-          cmRoot, hiveConf
-      );
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
deleted file mode 100644
index 53adea8..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Modifier;
-import java.util.HashMap;
-import java.util.Map;
-
-public class EventHandlerFactory {
-  private EventHandlerFactory() {
-  }
-
-  private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
-
-  static {
-    register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
-    register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
-    register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
-    register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
-    register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
-    register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
-    register(MessageFactory.INSERT_EVENT, InsertHandler.class);
-  }
-
-  static void register(String event, Class<? extends EventHandler> handlerClazz) {
-    try {
-      Constructor<? extends EventHandler> constructor =
-          handlerClazz.getDeclaredConstructor(NotificationEvent.class);
-      assert constructor != null;
-      assert !Modifier.isPrivate(constructor.getModifiers());
-      registeredHandlers.put(event, handlerClazz);
-    } catch (NoSuchMethodException e) {
-      throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
-          + " does not have the a constructor with only parameter of type:"
-          + NotificationEvent.class.getCanonicalName(), e);
-    }
-  }
-
-  public static EventHandler handlerFor(NotificationEvent event) {
-    if (registeredHandlers.containsKey(event.getEventType())) {
-      Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType());
-      try {
-        Constructor<? extends EventHandler> constructor =
-            handlerClazz.getDeclaredConstructor(NotificationEvent.class);
-        return constructor.newInstance(event);
-      } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
-        // this should never happen. however we want to make sure we propagate the exception
-        throw new RuntimeException(
-            "failed when creating handler for " + event.getEventType()
-                + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
-      }
-    }
-    return new DefaultHandler(event);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
deleted file mode 100644
index 910b396..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.thrift.TException;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class InsertHandler extends AbstractHandler {
-
-  InsertHandler(NotificationEvent event) {
-    super(event);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
-    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
-    Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
-    List<Partition> qlPtns = null;
-    if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
-      qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
-    }
-    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-
-    // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
-    withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
-    EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
-        qlMdTable, qlPtns,
-        withinContext.replicationSpec);
-    Iterable<String> files = insertMsg.getFiles();
-
-    if (files != null) {
-      Path dataPath;
-      if ((null == qlPtns) || qlPtns.isEmpty()) {
-        dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
-      } else {
-        /*
-         * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
-         * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
-         * will have only one entry.
-         */
-        assert(1 == qlPtns.size());
-        dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
-      }
-
-      // encoded filename/checksum of files, write into _files
-      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
-        for (String file : files) {
-          fileListWriter.write(file + "\n");
-        }
-      }
-    }
-
-    LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
-    DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
-    dmd.write();
-  }
-
-  private org.apache.hadoop.hive.ql.metadata.Table tableObject(
-      Context withinContext, InsertMessage insertMsg) throws TException {
-    return new org.apache.hadoop.hive.ql.metadata.Table(
-        withinContext.db.getMSC().getTable(
-            insertMsg.getDB(), insertMsg.getTable()
-        )
-    );
-  }
-
-  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
-    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
-    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_INSERT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
new file mode 100644
index 0000000..95e51e4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+abstract class AbstractMessageHandler implements MessageHandler {
+  final HashSet<ReadEntity> readEntitySet = new HashSet<>();
+  final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
+  final Map<String, Long> tablesUpdated = new HashMap<>(),
+      databasesUpdated = new HashMap<>();
+  final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
+
+  @Override
+  public Set<ReadEntity> readEntities() {
+    return readEntitySet;
+  }
+
+  @Override
+  public Set<WriteEntity> writeEntities() {
+    return writeEntitySet;
+  }
+
+  @Override
+  public Map<String, Long> tablesUpdated() {
+    return tablesUpdated;
+  }
+
+  @Override
+  public Map<String, Long> databasesUpdated() {
+    return databasesUpdated;
+  }
+
+  ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException {
+    String eventId = forContext.dmd.getEventTo().toString();
+    return replicationSpec(eventId, eventId);
+  }
+
+  private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException {
+    return new ReplicationSpec(true, false, fromId, toId, false, true, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
new file mode 100644
index 0000000..6d346b6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+class DefaultHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context withinContext)
+      throws SemanticException {
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
new file mode 100644
index 0000000..73f2613
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class DropPartitionHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    try {
+      DropPartitionMessage msg = deserializer.getDropPartitionMessage(context.dmd.getPayload());
+      String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+      String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+      Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs =
+          genPartSpecs(new Table(msg.getTableObj()),
+              msg.getPartitions());
+      if (partSpecs.size() > 0) {
+        DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName,
+            partSpecs, null, true, eventOnlyReplicationSpec(context));
+        Task<DDLWork> dropPtnTask = TaskFactory.get(
+            new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc),
+            context.hiveConf
+        );
+        context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
+            dropPtnDesc.getTableName(), msg.getPartitions());
+        databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+        tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+        return Collections.singletonList(dropPtnTask);
+      } else {
+        throw new SemanticException(
+            "DROP PARTITION EVENT does not return any part descs for event message :"
+                + context.dmd.getPayload());
+      }
+    } catch (Exception e) {
+      throw (e instanceof SemanticException)
+          ? (SemanticException) e
+          : new SemanticException("Error reading message members", e);
+    }
+  }
+
+  private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table,
+      List<Map<String, String>> partitions) throws SemanticException {
+    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+    int partPrefixLength = 0;
+    if (partitions.size() > 0) {
+      partPrefixLength = partitions.get(0).size();
+      // pick the length of the first ptn, we expect all ptns listed to have the same number of
+      // key-vals.
+    }
+    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+    for (Map<String, String> ptn : partitions) {
+      // convert each key-value-map to appropriate expression.
+      ExprNodeGenericFuncDesc expr = null;
+      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+        String key = kvp.getKey();
+        Object val = kvp.getValue();
+        String type = table.getPartColByName(key).getType();
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+            "=", column, new ExprNodeConstantDesc(pti, val));
+        expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+      }
+      if (expr != null) {
+        partitionDesc.add(expr);
+      }
+    }
+    if (partitionDesc.size() > 0) {
+      partSpecs.put(partPrefixLength, partitionDesc);
+    }
+    return partSpecs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
new file mode 100644
index 0000000..b623f2f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class DropTableHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload());
+    String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+    String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+    DropTableDesc dropTableDesc = new DropTableDesc(
+        actualDbName + "." + actualTblName,
+        null, true, true,
+        eventOnlyReplicationSpec(context));
+    Task<DDLWork> dropTableTask = TaskFactory.get(
+        new DDLWork(readEntitySet, writeEntitySet, dropTableDesc),
+        context.hiveConf
+    );
+    context.log
+        .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    return Collections.singletonList(dropTableTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
new file mode 100644
index 0000000..fa63169
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.List;
+
+class InsertHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context withinContext)
+      throws SemanticException {
+    InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload());
+    String actualDbName =
+        withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName;
+    String actualTblName =
+        withinContext.isTableNameEmpty() ? insertMessage.getTable() : withinContext.tableName;
+
+    Context currentContext = new Context(withinContext, actualDbName, actualTblName);
+    // Piggybacking in Import logic for now
+    TableHandler tableHandler = new TableHandler();
+    List<Task<? extends Serializable>> tasks = tableHandler.handle(currentContext);
+    readEntitySet.addAll(tableHandler.readEntities());
+    writeEntitySet.addAll(tableHandler.writeEntities());
+    databasesUpdated.putAll(tableHandler.databasesUpdated);
+    tablesUpdated.putAll(tableHandler.tablesUpdated);
+    return tasks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
new file mode 100644
index 0000000..840f95e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+public interface MessageHandler {
+
+  List<Task<? extends Serializable>> handle(Context withinContext) throws SemanticException;
+
+  Set<ReadEntity> readEntities();
+
+  Set<WriteEntity> writeEntities();
+
+  Map<String, Long> tablesUpdated();
+
+  Map<String, Long> databasesUpdated();
+
+  class Context {
+    final String dbName, tableName, location;
+    final Task<? extends Serializable> precursor;
+    DumpMetaData dmd;
+    final HiveConf hiveConf;
+    final Hive db;
+    final org.apache.hadoop.hive.ql.Context nestedContext;
+    final Logger log;
+
+    public Context(String dbName, String tableName, String location,
+        Task<? extends Serializable> precursor, DumpMetaData dmd, HiveConf hiveConf,
+        Hive db, org.apache.hadoop.hive.ql.Context nestedContext, Logger log) {
+      this.dbName = dbName;
+      this.tableName = tableName;
+      this.location = location;
+      this.precursor = precursor;
+      this.dmd = dmd;
+      this.hiveConf = hiveConf;
+      this.db = db;
+      this.nestedContext = nestedContext;
+      this.log = log;
+    }
+
+    public Context(Context other, String dbName, String tableName) {
+      this.dbName = dbName;
+      this.tableName = tableName;
+      this.location = other.location;
+      this.precursor = other.precursor;
+      this.dmd = other.dmd;
+      this.hiveConf = other.hiveConf;
+      this.db = other.db;
+      this.nestedContext = other.nestedContext;
+      this.log = other.log;
+    }
+
+    boolean isTableNameEmpty() {
+      return StringUtils.isEmpty(tableName);
+    }
+
+    boolean isDbNameEmpty() {
+      return StringUtils.isEmpty(dbName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
new file mode 100644
index 0000000..de6ff74
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+public class MessageHandlerFactory {
+  private static Map<DumpType, Class<? extends MessageHandler>> messageHandlers = new HashMap<>();
+
+  static {
+    register(DumpType.EVENT_DROP_PARTITION, DropPartitionHandler.class);
+    register(DumpType.EVENT_DROP_TABLE, DropTableHandler.class);
+    register(DumpType.EVENT_INSERT, InsertHandler.class);
+    register(DumpType.EVENT_RENAME_PARTITION, RenamePartitionHandler.class);
+    register(DumpType.EVENT_RENAME_TABLE, RenameTableHandler.class);
+
+    register(DumpType.EVENT_CREATE_TABLE, TableHandler.class);
+    register(DumpType.EVENT_ADD_PARTITION, TableHandler.class);
+    register(DumpType.EVENT_ALTER_TABLE, TableHandler.class);
+    register(DumpType.EVENT_ALTER_PARTITION, TableHandler.class);
+
+    register(DumpType.EVENT_TRUNCATE_PARTITION, TruncatePartitionHandler.class);
+    register(DumpType.EVENT_TRUNCATE_TABLE, TruncateTableHandler.class);
+  }
+
+  private static void register(DumpType eventType, Class<? extends MessageHandler> handlerClazz) {
+    try {
+      Constructor<? extends MessageHandler> constructor =
+          handlerClazz.getDeclaredConstructor();
+      assert constructor != null;
+      assert !Modifier.isPrivate(constructor.getModifiers());
+      messageHandlers.put(eventType, handlerClazz);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
+          + " does not have the a constructor with only parameter of type:"
+          + NotificationEvent.class.getCanonicalName(), e);
+    }
+  }
+
+  public static MessageHandler handlerFor(DumpType eventType) {
+    if (messageHandlers.containsKey(eventType)) {
+      Class<? extends MessageHandler> handlerClazz = messageHandlers.get(eventType);
+      try {
+        Constructor<? extends MessageHandler> constructor =
+            handlerClazz.getDeclaredConstructor();
+        return constructor.newInstance();
+      } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+        // this should never happen. however we want to make sure we propagate the exception
+        throw new RuntimeException(
+            "failed when creating handler for " + eventType
+                + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
+      }
+    }
+    return new DefaultHandler();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
new file mode 100644
index 0000000..658f2ba
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class RenamePartitionHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+
+    AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+    String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+    String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+    Map<String, String> newPartSpec = new LinkedHashMap<>();
+    Map<String, String> oldPartSpec = new LinkedHashMap<>();
+    String tableName = actualDbName + "." + actualTblName;
+    try {
+      Table tblObj = msg.getTableObj();
+      Iterator<String> beforeIterator = msg.getPtnObjBefore().getValuesIterator();
+      Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
+      for (FieldSchema fs : tblObj.getPartitionKeys()) {
+        oldPartSpec.put(fs.getName(), beforeIterator.next());
+        newPartSpec.put(fs.getName(), afterIterator.next());
+      }
+    } catch (Exception e) {
+      throw (e instanceof SemanticException)
+          ? (SemanticException) e
+          : new SemanticException("Error reading message members", e);
+    }
+
+    RenamePartitionDesc renamePtnDesc =
+        new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec);
+    Task<DDLWork> renamePtnTask = TaskFactory.get(
+        new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf
+    );
+    context.log
+        .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec,
+            newPartSpec);
+    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    tablesUpdated.put(tableName, context.dmd.getEventTo());
+    return Collections.singletonList(renamePtnTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
new file mode 100644
index 0000000..2c429c1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class RenameTableHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+
+    AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload());
+    if (!context.isTableNameEmpty()) {
+      throw new SemanticException(
+          "RENAMES of tables are not supported for table-level replication");
+    }
+    try {
+      String oldDbName = msg.getTableObjBefore().getDbName();
+      String newDbName = msg.getTableObjAfter().getDbName();
+
+      if (!context.isDbNameEmpty()) {
+        // If we're loading into a db, instead of into the warehouse, then the oldDbName and
+        // newDbName must be the same
+        if (!oldDbName.equalsIgnoreCase(newDbName)) {
+          throw new SemanticException("Cannot replicate an event renaming a table across"
+              + " databases into a db level load " + oldDbName + "->" + newDbName);
+        } else {
+          // both were the same, and can be replaced by the new db we're loading into.
+          oldDbName = context.dbName;
+          newDbName = context.dbName;
+        }
+      }
+
+      String oldName = oldDbName + "." + msg.getTableObjBefore().getTableName();
+      String newName = newDbName + "." + msg.getTableObjAfter().getTableName();
+      AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false);
+      Task<DDLWork> renameTableTask = TaskFactory.get(
+          new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf
+      );
+      context.log.debug(
+          "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName
+      );
+      // oldDbName and newDbName *will* be the same if we're here
+      databasesUpdated.put(newDbName, context.dmd.getEventTo());
+      tablesUpdated.remove(oldName);
+      tablesUpdated.put(newName, context.dmd.getEventTo());
+      // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated
+      // However, we explicitly don't support repl of that sort, and error out above if so. If that should
+      // ever change, this will need reworking.
+      return Collections.singletonList(renameTableTask);
+    } catch (Exception e) {
+      throw (e instanceof SemanticException)
+          ? (SemanticException) e
+          : new SemanticException("Error reading message members", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
new file mode 100644
index 0000000..2db8385
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+class TableHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+    // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+    // If tblName is null, then we default to the table name specified in _metadata, which is good.
+    // or are both specified, in which case, that's what we are intended to create the new table as.
+    if (context.isDbNameEmpty()) {
+      throw new SemanticException("Database name cannot be null for a table load");
+    }
+    try {
+      // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here
+
+      // no location set on repl loads
+      boolean isLocationSet = false;
+      // all repl imports are non-external
+      boolean isExternalSet = false;
+      // bootstrap loads are not partition level
+      boolean isPartSpecSet = false;
+      // repl loads are not partition level
+      LinkedHashMap<String, String> parsedPartSpec = null;
+      // no location for repl imports
+      String parsedLocation = null;
+      List<Task<? extends Serializable>> importTasks = new ArrayList<>();
+
+      EximUtil.SemanticAnalyzerWrapperContext x =
+          new EximUtil.SemanticAnalyzerWrapperContext(
+              context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log,
+              context.nestedContext);
+      ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+          (context.precursor != null), parsedLocation, context.tableName, context.dbName,
+          parsedPartSpec, context.location, x,
+          databasesUpdated, tablesUpdated);
+
+      return importTasks;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
new file mode 100644
index 0000000..5436f0d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class TruncatePartitionHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+    AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+    String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+    String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+    Map<String, String> partSpec = new LinkedHashMap<>();
+    try {
+      org.apache.hadoop.hive.metastore.api.Table tblObj = msg.getTableObj();
+      Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
+      for (FieldSchema fs : tblObj.getPartitionKeys()) {
+        partSpec.put(fs.getName(), afterIterator.next());
+      }
+    } catch (Exception e) {
+      if (!(e instanceof SemanticException)) {
+        throw new SemanticException("Error reading message members", e);
+      } else {
+        throw (SemanticException) e;
+      }
+    }
+
+    TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
+        actualDbName + "." + actualTblName, partSpec);
+    Task<DDLWork> truncatePtnTask =
+        TaskFactory.get(
+            new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc),
+            context.hiveConf
+        );
+    context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(),
+        truncateTableDesc.getTableName());
+    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    return Collections.singletonList(truncatePtnTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
new file mode 100644
index 0000000..731383c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+class TruncateTableHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
+    AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload());
+    String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+    String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName;
+
+    TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
+        actualDbName + "." + actualTblName, null);
+    Task<DDLWork> truncateTableTask = TaskFactory.get(
+        new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc),
+        context.hiveConf
+    );
+
+    context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(),
+        truncateTableDesc.getTableName());
+    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    return Collections.singletonList(truncateTableTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
new file mode 100644
index 0000000..c689e6f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventHandlerFactory {
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() {
+    class NonCompatibleEventHandler implements EventHandler {
+      @Override
+      public void handle(Context withinContext) throws Exception {
+
+      }
+
+      @Override
+      public long fromEventId() {
+        return 0;
+      }
+
+      @Override
+      public long toEventId() {
+        return 0;
+      }
+
+      @Override
+      public DumpType dumpType() {
+        return null;
+      }
+    }
+    EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class);
+  }
+
+  @Test
+  public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
+    EventHandler eventHandler =
+        EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
+            "shouldGiveDefaultHandler", "s"));
+    assertTrue(eventHandler instanceof DefaultHandler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
deleted file mode 100644
index 4b802c4..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestEventHandlerFactory {
-  @Test(expected = IllegalArgumentException.class)
-  public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() {
-    class NonCompatibleEventHandler implements EventHandler {
-      @Override
-      public void handle(Context withinContext) throws Exception {
-
-      }
-
-      @Override
-      public long fromEventId() {
-        return 0;
-      }
-
-      @Override
-      public long toEventId() {
-        return 0;
-      }
-
-      @Override
-      public DumpType dumpType() {
-        return null;
-      }
-    }
-    EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class);
-  }
-
-  @Test
-  public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
-    EventHandler eventHandler =
-        EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE,
-            "shouldGiveDefaultHandler", "s"));
-    assertTrue(eventHandler instanceof DefaultHandler);
-  }
-
-}
\ No newline at end of file


[2/2] hive git commit: HIVE-16268 : enable incremental repl dump to handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)

Posted by kh...@apache.org.
HIVE-16268 : enable incremental repl dump to handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d4f13af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d4f13af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d4f13af

Branch: refs/heads/master
Commit: 9d4f13afda34250b7cf722287a557426e90ff24d
Parents: 699d6ce
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Thu May 4 02:48:27 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Sun May 7 15:43:21 2017 -0700

----------------------------------------------------------------------
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 306 ++-----------------
 .../hadoop/hive/ql/parse/repl/DumpType.java     |   1 +
 .../parse/repl/dump/events/AbstractHandler.java |  46 +++
 .../repl/dump/events/AddPartitionHandler.java   | 114 +++++++
 .../repl/dump/events/AlterPartitionHandler.java | 112 +++++++
 .../repl/dump/events/AlterTableHandler.java     | 102 +++++++
 .../repl/dump/events/CreateFunctionHandler.java |  36 +++
 .../repl/dump/events/CreateTableHandler.java    |  86 ++++++
 .../parse/repl/dump/events/DefaultHandler.java  |  44 +++
 .../repl/dump/events/DropPartitionHandler.java  |  44 +++
 .../repl/dump/events/DropTableHandler.java      |  44 +++
 .../ql/parse/repl/dump/events/EventHandler.java |  62 ++++
 .../repl/dump/events/EventHandlerFactory.java   |  76 +++++
 .../parse/repl/dump/events/InsertHandler.java   | 110 +++++++
 .../ql/parse/repl/events/AbstractHandler.java   |  46 ---
 .../parse/repl/events/AddPartitionHandler.java  | 114 -------
 .../repl/events/AlterPartitionHandler.java      | 112 -------
 .../ql/parse/repl/events/AlterTableHandler.java | 102 -------
 .../parse/repl/events/CreateTableHandler.java   |  86 ------
 .../ql/parse/repl/events/DefaultHandler.java    |  44 ---
 .../parse/repl/events/DropPartitionHandler.java |  44 ---
 .../ql/parse/repl/events/DropTableHandler.java  |  44 ---
 .../hive/ql/parse/repl/events/EventHandler.java |  62 ----
 .../parse/repl/events/EventHandlerFactory.java  |  75 -----
 .../ql/parse/repl/events/InsertHandler.java     | 110 -------
 .../load/message/AbstractMessageHandler.java    |  67 ++++
 .../parse/repl/load/message/DefaultHandler.java |  33 ++
 .../repl/load/message/DropPartitionHandler.java | 108 +++++++
 .../repl/load/message/DropTableHandler.java     |  51 ++++
 .../parse/repl/load/message/InsertHandler.java  |  47 +++
 .../parse/repl/load/message/MessageHandler.java |  91 ++++++
 .../load/message/MessageHandlerFactory.java     |  79 +++++
 .../load/message/RenamePartitionHandler.java    |  74 +++++
 .../repl/load/message/RenameTableHandler.java   |  81 +++++
 .../parse/repl/load/message/TableHandler.java   |  68 +++++
 .../load/message/TruncatePartitionHandler.java  |  69 +++++
 .../repl/load/message/TruncateTableHandler.java |  50 +++
 .../dump/events/TestEventHandlerFactory.java    |  62 ++++
 .../repl/events/TestEventHandlerFactory.java    |  62 ----
 39 files changed, 1781 insertions(+), 1183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 2daa123..5d1d2fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,22 +27,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -53,29 +46,23 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler;
-import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandlerFactory;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.DropTableDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
-import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,7 +74,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -681,270 +667,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private List<Task<? extends Serializable>> analyzeEventLoad(
-      String dbName, String tblName, String locn,
-      Task<? extends Serializable> precursor,
-      Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated,
-      DumpMetaData dmd) throws SemanticException {
-    MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
-    switch (dmd.getDumpType()) {
-      case EVENT_CREATE_TABLE: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
-      }
-      case EVENT_ADD_PARTITION: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
-      }
-      case EVENT_DROP_TABLE: {
-        DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload());
-        String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropTableMessage.getDB() : dbName);
-        String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropTableMessage.getTable() : tblName);
-        DropTableDesc dropTableDesc = new DropTableDesc(
-            actualDbName + "." + actualTblName,
-            null, true, true,
-            getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
-        Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf);
-        if (precursor != null){
-          precursor.addDependentTask(dropTableTask);
-        }
-        List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-        tasks.add(dropTableTask);
-        LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
-        dbsUpdated.put(actualDbName,dmd.getEventTo());
-        return tasks;
-      }
-      case EVENT_DROP_PARTITION: {
-        try {
-          DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload());
-          String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropPartitionMessage.getDB() : dbName);
-          String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropPartitionMessage.getTable() : tblName);
-          Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs;
-          partSpecs =
-              genPartSpecs(new Table(dropPartitionMessage.getTableObj()),
-                  dropPartitionMessage.getPartitions());
-          if (partSpecs.size() > 0) {
-            DropTableDesc dropPtnDesc = new DropTableDesc(
-                actualDbName + "." + actualTblName,
-                partSpecs, null, true,
-                getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
-            Task<DDLWork> dropPtnTask =
-                TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf);
-            if (precursor != null) {
-              precursor.addDependentTask(dropPtnTask);
-            }
-            List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-            tasks.add(dropPtnTask);
-            LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
-                dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions());
-            dbsUpdated.put(actualDbName, dmd.getEventTo());
-            tablesUpdated.put(actualDbName + "." + actualTblName, dmd.getEventTo());
-            return tasks;
-          } else {
-            throw new SemanticException(
-                "DROP PARTITION EVENT does not return any part descs for event message :"
-                    + dmd.getPayload());
-          }
-        } catch (Exception e) {
-          if (!(e instanceof SemanticException)){
-            throw new SemanticException("Error reading message members", e);
-          } else {
-            throw (SemanticException)e;
-          }
-        }
-      }
-      case EVENT_ALTER_TABLE: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
-      }
-      case EVENT_RENAME_TABLE: {
-        AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload());
-        if ((tblName != null) && (!tblName.isEmpty())){
-          throw new SemanticException("RENAMES of tables are not supported for table-level replication");
-        }
-        try {
-          String oldDbName = renameTableMessage.getTableObjBefore().getDbName();
-          String newDbName = renameTableMessage.getTableObjAfter().getDbName();
-
-          if ((dbName != null) && (!dbName.isEmpty())){
-            // If we're loading into a db, instead of into the warehouse, then the oldDbName and
-            // newDbName must be the same
-            if (!oldDbName.equalsIgnoreCase(newDbName)){
-              throw new SemanticException("Cannot replicate an event renaming a table across"
-                  + " databases into a db level load " + oldDbName +"->" + newDbName);
-            } else {
-              // both were the same, and can be replaced by the new db we're loading into.
-              oldDbName = dbName;
-              newDbName = dbName;
-            }
-          }
-
-          String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName();
-          String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName();
-          AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false);
-          Task<DDLWork> renameTableTask = TaskFactory.get(new DDLWork(inputs, outputs, renameTableDesc), conf);
-          if (precursor != null){
-            precursor.addDependentTask(renameTableTask);
-          }
-          List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-          tasks.add(renameTableTask);
-          LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName);
-          dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and newDbName *will* be the same if we're here
-          tablesUpdated.remove(oldName);
-          tablesUpdated.put(newName, dmd.getEventTo());
-          // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated
-          // However, we explicitly don't support repl of that sort, and error out above if so. If that should
-          // ever change, this will need reworking.
-          return tasks;
-        } catch (Exception e) {
-          if (!(e instanceof SemanticException)){
-            throw new SemanticException("Error reading message members", e);
-          } else {
-            throw (SemanticException)e;
-          }
-        }
-      }
-      case EVENT_TRUNCATE_TABLE: {
-        AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload());
-        String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName);
-        String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName);
-
-        TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
-                actualDbName + "." + actualTblName, null);
-        Task<DDLWork> truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
-        if (precursor != null) {
-          precursor.addDependentTask(truncateTableTask);
-        }
-
-        List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-        tasks.add(truncateTableTask);
-        LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName());
-        dbsUpdated.put(actualDbName,dmd.getEventTo());
-        return tasks;
-      }
-      case EVENT_ALTER_PARTITION: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
-      }
-      case EVENT_RENAME_PARTITION: {
-        AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload());
-        String actualDbName = ((dbName == null) || dbName.isEmpty() ? renamePtnMessage.getDB() : dbName);
-        String actualTblName = ((tblName == null) || tblName.isEmpty() ? renamePtnMessage.getTable() : tblName);
-
-        Map<String, String> newPartSpec = new LinkedHashMap<String,String>();
-        Map<String, String> oldPartSpec = new LinkedHashMap<String,String>();
-        String tableName = actualDbName + "." + actualTblName;
-        try {
-          org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj();
-          org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore();
-          org.apache.hadoop.hive.metastore.api.Partition pobjAfter = renamePtnMessage.getPtnObjAfter();
-          Iterator<String> beforeValIter = pobjBefore.getValuesIterator();
-          Iterator<String> afterValIter = pobjAfter.getValuesIterator();
-          for (FieldSchema fs : tblObj.getPartitionKeys()){
-            oldPartSpec.put(fs.getName(), beforeValIter.next());
-            newPartSpec.put(fs.getName(), afterValIter.next());
-          }
-        } catch (Exception e) {
-          if (!(e instanceof SemanticException)){
-            throw new SemanticException("Error reading message members", e);
-          } else {
-            throw (SemanticException)e;
-          }
-        }
-
-        RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec);
-        Task<DDLWork> renamePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, renamePtnDesc), conf);
-        if (precursor != null){
-          precursor.addDependentTask(renamePtnTask);
-        }
-        List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-        tasks.add(renamePtnTask);
-        LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec);
-        dbsUpdated.put(actualDbName, dmd.getEventTo());
-        tablesUpdated.put(tableName, dmd.getEventTo());
-        return tasks;
-      }
-      case EVENT_TRUNCATE_PARTITION: {
-        AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload());
-        String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName);
-        String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName);
-
-        Map<String, String> partSpec = new LinkedHashMap<String,String>();
-        try {
-          org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj();
-          org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter();
-          Iterator<String> afterValIter = pobjAfter.getValuesIterator();
-          for (FieldSchema fs : tblObj.getPartitionKeys()){
-            partSpec.put(fs.getName(), afterValIter.next());
-          }
-        } catch (Exception e) {
-          if (!(e instanceof SemanticException)){
-            throw new SemanticException("Error reading message members", e);
-          } else {
-            throw (SemanticException)e;
-          }
-        }
-
-        TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
-                actualDbName + "." + actualTblName, partSpec);
-        Task<DDLWork> truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
-        if (precursor != null) {
-          precursor.addDependentTask(truncatePtnTask);
-        }
-
-        List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-        tasks.add(truncatePtnTask);
-        LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName());
-        dbsUpdated.put(actualDbName,dmd.getEventTo());
-        return tasks;
-      }
-      case EVENT_INSERT: {
-        md = MessageFactory.getInstance().getDeserializer();
-        InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
-        String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName);
-        String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName);
-
-        // Piggybacking in Import logic for now
-        return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated);
-      }
-      case EVENT_UNKNOWN: {
-        break;
-      }
-      default: {
-        break;
-      }
-    }
-    return null;
-  }
-
-  private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table,
-      List<Map<String, String>> partitions) throws SemanticException {
-    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs =
-        new HashMap<Integer, List<ExprNodeGenericFuncDesc>>();
-    int partPrefixLength = 0;
-    if ((partitions != null) && (partitions.size() > 0)) {
-      partPrefixLength = partitions.get(0).size();
-      // pick the length of the first ptn, we expect all ptns listed to have the same number of
-      // key-vals.
-    }
-    List<ExprNodeGenericFuncDesc> ptnDescs = new ArrayList<ExprNodeGenericFuncDesc>();
-    for (Map<String, String> ptn : partitions) {
-      // convert each key-value-map to appropriate expression.
-      ExprNodeGenericFuncDesc expr = null;
-      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
-        String key = kvp.getKey();
-        Object val = kvp.getValue();
-        String type = table.getPartColByName(key).getType();
-        ;
-        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
-        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
-        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
-            "=", column, new ExprNodeConstantDesc(pti, val));
-        expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
-      }
-      if (expr != null) {
-        ptnDescs.add(expr);
+      String dbName, String tblName, String locn, Task<? extends Serializable> precursor,
+      Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, DumpMetaData dmd)
+      throws SemanticException {
+    MessageHandler.Context context =
+        new MessageHandler.Context(dbName, tblName, locn, precursor, dmd, conf, db, ctx, LOG);
+    MessageHandler messageHandler = MessageHandlerFactory.handlerFor(dmd.getDumpType());
+    List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
+
+    if (precursor != null) {
+      for (Task<? extends Serializable> t : tasks) {
+        precursor.addDependentTask(t);
+        LOG.debug("Added {}:{} as a precursor of {}:{}",
+            precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
       }
     }
-    if (ptnDescs.size() > 0) {
-      partSpecs.put(partPrefixLength, ptnDescs);
-    }
-    return partSpecs;
+    dbsUpdated.putAll(messageHandler.databasesUpdated());
+    tablesUpdated.putAll(messageHandler.tablesUpdated());
+    inputs.addAll(messageHandler.readEntities());
+    outputs.addAll(messageHandler.writeEntities());
+    return tasks;
   }
 
   private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index b1df5a3..c2cffaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -31,6 +31,7 @@ public enum DumpType {
   EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
   EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
   EVENT_INSERT("EVENT_INSERT"),
+  EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION"),
   EVENT_UNKNOWN("EVENT_UNKNOWN");
 
   String type = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
new file mode 100644
index 0000000..ba699e3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractHandler implements EventHandler {
+  static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);
+
+  final NotificationEvent event;
+  final MessageDeserializer deserializer;
+
+  AbstractHandler(NotificationEvent event) {
+    this.event = event;
+    deserializer = MessageFactory.getInstance().getDeserializer();
+  }
+
+  @Override
+  public long fromEventId() {
+    return event.getEventId();
+  }
+
+  @Override
+  public long toEventId() {
+    return event.getEventId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
new file mode 100644
index 0000000..f4239e5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import javax.annotation.Nullable;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class AddPartitionHandler extends AbstractHandler {
+  protected AddPartitionHandler(NotificationEvent notificationEvent) {
+    super(notificationEvent);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
+    LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
+    Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs();
+    if ((ptns == null) || (!ptns.iterator().hasNext())) {
+      LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
+      return;
+    }
+    org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
+    if (tobj == null) {
+      LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
+      return;
+    }
+
+    final Table qlMdTable = new Table(tobj);
+    Iterable<Partition> qlPtns = Iterables.transform(
+        ptns,
+        new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
+          @Nullable
+          @Override
+          public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
+            if (input == null) {
+              return null;
+            }
+            try {
+              return new Partition(qlMdTable, input);
+            } catch (HiveException e) {
+              throw new IllegalArgumentException(e);
+            }
+          }
+        }
+    );
+
+    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+    EximUtil.createExportDump(
+        metaDataPath.getFileSystem(withinContext.hiveConf),
+        metaDataPath,
+        qlMdTable,
+        qlPtns,
+        withinContext.replicationSpec);
+
+    Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator();
+    for (Partition qlPtn : qlPtns) {
+      Iterable<String> files = partitionFilesIter.next().getFiles();
+      if (files != null) {
+        // encoded filename/checksum of files, write into _files
+        try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
+          for (String file : files) {
+            fileListWriter.write(file + "\n");
+          }
+        }
+      }
+    }
+    withinContext.createDmd(this).write();
+  }
+
+  private BufferedWriter writer(Context withinContext, Partition qlPtn)
+      throws IOException {
+    Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
+    FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
+    Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_ADD_PARTITION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
new file mode 100644
index 0000000..8a7e742
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AlterPartitionHandler extends AbstractHandler {
+  private final org.apache.hadoop.hive.metastore.api.Partition after;
+  private final org.apache.hadoop.hive.metastore.api.Table tableObject;
+  private final boolean isTruncateOp;
+  private final Scenario scenario;
+
+  AlterPartitionHandler(NotificationEvent event) throws Exception {
+    super(event);
+    AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
+    tableObject = apm.getTableObj();
+    org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
+    after = apm.getPtnObjAfter();
+    isTruncateOp = apm.getIsTruncateOp();
+    scenario = scenarioType(before, after);
+  }
+
+  private enum Scenario {
+    ALTER {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_ALTER_PARTITION;
+      }
+    },
+    RENAME {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_RENAME_PARTITION;
+      }
+    },
+    TRUNCATE {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_TRUNCATE_PARTITION;
+      }
+    };
+
+    abstract DumpType dumpType();
+  }
+
+  private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
+      org.apache.hadoop.hive.metastore.api.Partition after) {
+    Iterator<String> beforeValIter = before.getValuesIterator();
+    Iterator<String> afterValIter = after.getValuesIterator();
+    while(beforeValIter.hasNext()) {
+      if (!beforeValIter.next().equals(afterValIter.next())) {
+        return Scenario.RENAME;
+      }
+    }
+    return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
+
+    if (Scenario.ALTER == scenario) {
+      withinContext.replicationSpec.setIsMetadataOnly(true);
+      Table qlMdTable = new Table(tableObject);
+      List<Partition> partitions = new ArrayList<>();
+      partitions.add(new Partition(qlMdTable, after));
+      Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+      EximUtil.createExportDump(
+          metaDataPath.getFileSystem(withinContext.hiveConf),
+          metaDataPath,
+          qlMdTable,
+          partitions,
+          withinContext.replicationSpec);
+    }
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return scenario.dumpType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
new file mode 100644
index 0000000..f457f23
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AlterTableHandler extends AbstractHandler {
+  private final org.apache.hadoop.hive.metastore.api.Table after;
+  private final boolean isTruncateOp;
+  private final Scenario scenario;
+
+  private enum Scenario {
+    ALTER {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_ALTER_TABLE;
+      }
+    },
+    RENAME {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_RENAME_TABLE;
+      }
+    },
+    TRUNCATE {
+      @Override
+      DumpType dumpType() {
+        return DumpType.EVENT_TRUNCATE_TABLE;
+      }
+    };
+
+    abstract DumpType dumpType();
+  }
+
+  AlterTableHandler(NotificationEvent event) throws Exception {
+    super(event);
+    AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
+    org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
+    after = atm.getTableObjAfter();
+    isTruncateOp = atm.getIsTruncateOp();
+    scenario = scenarioType(before, after);
+  }
+
+  private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
+      org.apache.hadoop.hive.metastore.api.Table after) {
+    if (before.getDbName().equals(after.getDbName())
+        && before.getTableName().equals(after.getTableName())) {
+      return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
+    } else {
+      return Scenario.RENAME;
+    }
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    {
+      LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
+      if (Scenario.ALTER == scenario) {
+        withinContext.replicationSpec.setIsMetadataOnly(true);
+        Table qlMdTableAfter = new Table(after);
+        Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+        EximUtil.createExportDump(
+            metaDataPath.getFileSystem(withinContext.hiveConf),
+            metaDataPath,
+            qlMdTableAfter,
+            null,
+            withinContext.replicationSpec);
+      }
+      DumpMetaData dmd = withinContext.createDmd(this);
+      dmd.setPayload(event.getMessage());
+      dmd.write();
+    }
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return scenario.dumpType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
new file mode 100644
index 0000000..bebf035
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class CreateFunctionHandler extends AbstractHandler {
+  CreateFunctionHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    CreateFunctionMessage createFunctionMessage =
+        deserializer.getCreateFunctionMessage(event.getMessage());
+    LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage());
+    Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+    FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
+
+    try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
+      new FunctionSerializer(createFunctionMessage.getFunctionObj())
+          .writeTo(jsonWriter, withinContext.replicationSpec);
+    }
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_CREATE_FUNCTION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
new file mode 100644
index 0000000..ca3607f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+class CreateTableHandler extends AbstractHandler {
+
+  CreateTableHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
+    LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
+    org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
+
+    if (tobj == null) {
+      LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
+      return;
+    }
+
+    Table qlMdTable = new Table(tobj);
+    if (qlMdTable.isView()) {
+      withinContext.replicationSpec.setIsMetadataOnly(true);
+    }
+
+    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+    EximUtil.createExportDump(
+        metaDataPath.getFileSystem(withinContext.hiveConf),
+        metaDataPath,
+        qlMdTable,
+        null,
+        withinContext.replicationSpec);
+
+    Path dataPath = new Path(withinContext.eventRoot, "data");
+    Iterable<String> files = ctm.getFiles();
+    if (files != null) {
+      // encoded filename/checksum of files, write into _files
+      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+        for (String file : files) {
+          fileListWriter.write(file + "\n");
+        }
+      }
+    }
+    withinContext.createDmd(this).write();
+  }
+
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_CREATE_TABLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
new file mode 100644
index 0000000..0d4665a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DefaultHandler extends AbstractHandler {
+
+  DefaultHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_UNKNOWN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
new file mode 100644
index 0000000..a4eacc4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropPartitionHandler extends AbstractHandler {
+
+  DropPartitionHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_DROP_PARTITION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
new file mode 100644
index 0000000..40cd5cb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropTableHandler extends AbstractHandler {
+
+  DropTableHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_DROP_TABLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
new file mode 100644
index 0000000..c0fa7b2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+public interface EventHandler {
+  void handle(Context withinContext) throws Exception;
+
+  long fromEventId();
+
+  long toEventId();
+
+  DumpType dumpType();
+
+  class Context {
+    final Path eventRoot, cmRoot;
+    final Hive db;
+    final HiveConf hiveConf;
+    final ReplicationSpec replicationSpec;
+
+    public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
+        ReplicationSpec replicationSpec) {
+      this.eventRoot = eventRoot;
+      this.cmRoot = cmRoot;
+      this.db = db;
+      this.hiveConf = hiveConf;
+      this.replicationSpec = replicationSpec;
+    }
+
+    DumpMetaData createDmd(EventHandler eventHandler) {
+      return new DumpMetaData(
+          eventRoot,
+          eventHandler.dumpType(),
+          eventHandler.fromEventId(),
+          eventHandler.toEventId(),
+          cmRoot, hiveConf
+      );
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
new file mode 100644
index 0000000..08dbd13
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventHandlerFactory {
+  private EventHandlerFactory() {
+  }
+
+  private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>();
+
+  static {
+    register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
+    register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
+    register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
+    register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
+    register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
+    register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
+    register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
+    register(MessageFactory.INSERT_EVENT, InsertHandler.class);
+  }
+
+  static void register(String event, Class<? extends EventHandler> handlerClazz) {
+    try {
+      Constructor<? extends EventHandler> constructor =
+          handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+      assert constructor != null;
+      assert !Modifier.isPrivate(constructor.getModifiers());
+      registeredHandlers.put(event, handlerClazz);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName()
+          + " does not have the a constructor with only parameter of type:"
+          + NotificationEvent.class.getCanonicalName(), e);
+    }
+  }
+
+  public static EventHandler handlerFor(NotificationEvent event) {
+    if (registeredHandlers.containsKey(event.getEventType())) {
+      Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType());
+      try {
+        Constructor<? extends EventHandler> constructor =
+            handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+        return constructor.newInstance(event);
+      } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+        // this should never happen. however we want to make sure we propagate the exception
+        throw new RuntimeException(
+            "failed when creating handler for " + event.getEventType()
+                + " with the responsible class being " + handlerClazz.getCanonicalName(), e);
+      }
+    }
+    return new DefaultHandler(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
new file mode 100644
index 0000000..0393701
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.thrift.TException;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class InsertHandler extends AbstractHandler {
+
+  InsertHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage());
+    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg);
+    Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+    List<Partition> qlPtns = null;
+    if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
+      qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false));
+    }
+    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+
+    // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation
+    withinContext.replicationSpec.setIsReplace(insertMsg.isReplace());
+    EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
+        qlMdTable, qlPtns,
+        withinContext.replicationSpec);
+    Iterable<String> files = insertMsg.getFiles();
+
+    if (files != null) {
+      Path dataPath;
+      if ((null == qlPtns) || qlPtns.isEmpty()) {
+        dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      } else {
+        /*
+         * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
+         * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
+         * will have only one entry.
+         */
+        assert(1 == qlPtns.size());
+        dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
+      }
+
+      // encoded filename/checksum of files, write into _files
+      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+        for (String file : files) {
+          fileListWriter.write(file + "\n");
+        }
+      }
+    }
+
+    LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  private org.apache.hadoop.hive.ql.metadata.Table tableObject(
+      Context withinContext, InsertMessage insertMsg) throws TException {
+    return new org.apache.hadoop.hive.ql.metadata.Table(
+        withinContext.db.getMSC().getTable(
+            insertMsg.getDB(), insertMsg.getTable()
+        )
+    );
+  }
+
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_INSERT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
deleted file mode 100644
index ab059c2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractHandler implements EventHandler {
-  static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);
-
-  final NotificationEvent event;
-  final MessageDeserializer deserializer;
-
-  AbstractHandler(NotificationEvent event) {
-    this.event = event;
-    deserializer = MessageFactory.getInstance().getDeserializer();
-  }
-
-  @Override
-  public long fromEventId() {
-    return event.getEventId();
-  }
-
-  @Override
-  public long toEventId() {
-    return event.getEventId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
deleted file mode 100644
index 1616ab9..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import javax.annotation.Nullable;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Iterator;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public class AddPartitionHandler extends AbstractHandler {
-  protected AddPartitionHandler(NotificationEvent notificationEvent) {
-    super(notificationEvent);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
-    LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
-    Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs();
-    if ((ptns == null) || (!ptns.iterator().hasNext())) {
-      LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
-      return;
-    }
-    org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
-    if (tobj == null) {
-      LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
-      return;
-    }
-
-    final Table qlMdTable = new Table(tobj);
-    Iterable<Partition> qlPtns = Iterables.transform(
-        ptns,
-        new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
-          @Nullable
-          @Override
-          public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
-            if (input == null) {
-              return null;
-            }
-            try {
-              return new Partition(qlMdTable, input);
-            } catch (HiveException e) {
-              throw new IllegalArgumentException(e);
-            }
-          }
-        }
-    );
-
-    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-    EximUtil.createExportDump(
-        metaDataPath.getFileSystem(withinContext.hiveConf),
-        metaDataPath,
-        qlMdTable,
-        qlPtns,
-        withinContext.replicationSpec);
-
-    Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator();
-    for (Partition qlPtn : qlPtns) {
-      Iterable<String> files = partitionFilesIter.next().getFiles();
-      if (files != null) {
-        // encoded filename/checksum of files, write into _files
-        try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
-          for (String file : files) {
-            fileListWriter.write(file + "\n");
-          }
-        }
-      }
-    }
-    withinContext.createDmd(this).write();
-  }
-
-  private BufferedWriter writer(Context withinContext, Partition qlPtn)
-      throws IOException {
-    Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
-    FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
-    Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
-    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_ADD_PARTITION;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
deleted file mode 100644
index b6c3496..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class AlterPartitionHandler extends AbstractHandler {
-  private final org.apache.hadoop.hive.metastore.api.Partition after;
-  private final org.apache.hadoop.hive.metastore.api.Table tableObject;
-  private final boolean isTruncateOp;
-  private final Scenario scenario;
-
-  AlterPartitionHandler(NotificationEvent event) throws Exception {
-    super(event);
-    AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage());
-    tableObject = apm.getTableObj();
-    org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore();
-    after = apm.getPtnObjAfter();
-    isTruncateOp = apm.getIsTruncateOp();
-    scenario = scenarioType(before, after);
-  }
-
-  private enum Scenario {
-    ALTER {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_ALTER_PARTITION;
-      }
-    },
-    RENAME {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_RENAME_PARTITION;
-      }
-    },
-    TRUNCATE {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_TRUNCATE_PARTITION;
-      }
-    };
-
-    abstract DumpType dumpType();
-  }
-
-  private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
-      org.apache.hadoop.hive.metastore.api.Partition after) {
-    Iterator<String> beforeValIter = before.getValuesIterator();
-    Iterator<String> afterValIter = after.getValuesIterator();
-    while(beforeValIter.hasNext()) {
-      if (!beforeValIter.next().equals(afterValIter.next())) {
-        return Scenario.RENAME;
-      }
-    }
-    return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage());
-
-    if (Scenario.ALTER == scenario) {
-      withinContext.replicationSpec.setIsMetadataOnly(true);
-      Table qlMdTable = new Table(tableObject);
-      List<Partition> partitions = new ArrayList<>();
-      partitions.add(new Partition(qlMdTable, after));
-      Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-      EximUtil.createExportDump(
-          metaDataPath.getFileSystem(withinContext.hiveConf),
-          metaDataPath,
-          qlMdTable,
-          partitions,
-          withinContext.replicationSpec);
-    }
-    DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
-    dmd.write();
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return scenario.dumpType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
deleted file mode 100644
index d553240..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class AlterTableHandler extends AbstractHandler {
-  private final org.apache.hadoop.hive.metastore.api.Table after;
-  private final boolean isTruncateOp;
-  private final Scenario scenario;
-
-  private enum Scenario {
-    ALTER {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_ALTER_TABLE;
-      }
-    },
-    RENAME {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_RENAME_TABLE;
-      }
-    },
-    TRUNCATE {
-      @Override
-      DumpType dumpType() {
-        return DumpType.EVENT_TRUNCATE_TABLE;
-      }
-    };
-
-    abstract DumpType dumpType();
-  }
-
-  AlterTableHandler(NotificationEvent event) throws Exception {
-    super(event);
-    AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
-    org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
-    after = atm.getTableObjAfter();
-    isTruncateOp = atm.getIsTruncateOp();
-    scenario = scenarioType(before, after);
-  }
-
-  private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
-      org.apache.hadoop.hive.metastore.api.Table after) {
-    if (before.getDbName().equals(after.getDbName())
-        && before.getTableName().equals(after.getTableName())) {
-      return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER;
-    } else {
-      return Scenario.RENAME;
-    }
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    {
-      LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage());
-      if (Scenario.ALTER == scenario) {
-        withinContext.replicationSpec.setIsMetadataOnly(true);
-        Table qlMdTableAfter = new Table(after);
-        Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-        EximUtil.createExportDump(
-            metaDataPath.getFileSystem(withinContext.hiveConf),
-            metaDataPath,
-            qlMdTableAfter,
-            null,
-            withinContext.replicationSpec);
-      }
-      DumpMetaData dmd = withinContext.createDmd(this);
-      dmd.setPayload(event.getMessage());
-      dmd.write();
-    }
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return scenario.dumpType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
deleted file mode 100644
index 88600fd..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-public class CreateTableHandler extends AbstractHandler {
-
-  CreateTableHandler(NotificationEvent event) {
-    super(event);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage());
-    LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage());
-    org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
-
-    if (tobj == null) {
-      LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
-      return;
-    }
-
-    Table qlMdTable = new Table(tobj);
-    if (qlMdTable.isView()) {
-      withinContext.replicationSpec.setIsMetadataOnly(true);
-    }
-
-    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
-    EximUtil.createExportDump(
-        metaDataPath.getFileSystem(withinContext.hiveConf),
-        metaDataPath,
-        qlMdTable,
-        null,
-        withinContext.replicationSpec);
-
-    Path dataPath = new Path(withinContext.eventRoot, "data");
-    Iterable<String> files = ctm.getFiles();
-    if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
-        for (String file : files) {
-          fileListWriter.write(file + "\n");
-        }
-      }
-    }
-    withinContext.createDmd(this).write();
-  }
-
-  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
-    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
-    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_CREATE_TABLE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
deleted file mode 100644
index 78cd74f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DefaultHandler extends AbstractHandler {
-
-  DefaultHandler(NotificationEvent event) {
-    super(event);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage());
-    DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
-    dmd.write();
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_UNKNOWN;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
deleted file mode 100644
index c4a0908..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DropPartitionHandler extends AbstractHandler {
-
-  DropPartitionHandler(NotificationEvent event) {
-    super(event);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage());
-    DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
-    dmd.write();
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_DROP_PARTITION;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
deleted file mode 100644
index e3addaf..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.events;
-
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
-import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-
-public class DropTableHandler extends AbstractHandler {
-
-  DropTableHandler(NotificationEvent event) {
-    super(event);
-  }
-
-  @Override
-  public void handle(Context withinContext) throws Exception {
-    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
-    DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
-    dmd.write();
-  }
-
-  @Override
-  public DumpType dumpType() {
-    return DumpType.EVENT_DROP_TABLE;
-  }
-}