You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/04 09:49:31 UTC

[1/2] hive git commit: HIVE-16267 : Enable bootstrap function metadata to be loaded in repl load (Anishek Agarwal, reviewed by Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master f56abb405 -> 9e9356b5e


http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
new file mode 100644
index 0000000..077d39b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class PartitionSerializer implements JsonWriter.Serializer {
+  public static final String FIELD_NAME="partitions";
+  private Partition partition;
+
+  PartitionSerializer(Partition partition) {
+    this.partition = partition;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      if (additionalPropertiesProvider.isInReplicationScope()) {
+        partition.putToParameters(
+            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+            additionalPropertiesProvider.getCurrentReplicationState());
+        if (isPartitionExternal()) {
+          // Replication destination will not be external
+          partition.putToParameters("EXTERNAL", "FALSE");
+        }
+      }
+      writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8));
+      writer.jsonGenerator.flush();
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+    }
+  }
+
+  private boolean isPartitionExternal() {
+    Map<String, String> params = partition.getParameters();
+    return params.containsKey("EXTERNAL")
+        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
new file mode 100644
index 0000000..3a92e8a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+public class ReplicationSpecSerializer implements JsonWriter.Serializer {
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) {
+      String value = additionalPropertiesProvider.get(key);
+      if (value != null) {
+        writer.jsonGenerator.writeStringField(key.toString(), value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
new file mode 100644
index 0000000..948cb39
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TableSerializer implements JsonWriter.Serializer {
+  public static final String FIELD_NAME = "table";
+  private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
+  private final Iterable<Partition> partitions;
+
+  public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle,
+      Iterable<Partition> partitions) {
+    this.tableHandle = tableHandle;
+    this.partitions = partitions;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    if (cannotReplicateTable(additionalPropertiesProvider)) {
+      return;
+    }
+
+    Table tTable = tableHandle.getTTable();
+    tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
+    try {
+      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+      writer.jsonGenerator
+          .writeStringField(FIELD_NAME, serializer.toString(tTable, UTF_8));
+      writer.jsonGenerator.writeFieldName(PartitionSerializer.FIELD_NAME);
+      writePartitions(writer, additionalPropertiesProvider);
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+    }
+  }
+
+  private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvider) {
+    return tableHandle == null || additionalPropertiesProvider.isNoop();
+  }
+
+  private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    if (additionalPropertiesProvider.isInReplicationScope()) {
+      table.putToParameters(
+            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+            additionalPropertiesProvider.getCurrentReplicationState());
+      if (isExternalTable(table)) {
+          // Replication destination will not be external - override if set
+        table.putToParameters("EXTERNAL", "FALSE");
+        }
+      if (isExternalTableType(table)) {
+          // Replication dest will not be external - override if set
+        table.setTableType(TableType.MANAGED_TABLE.toString());
+        }
+    } else {
+      // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
+      // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
+      // TODO: if we want to be explicit about this dump not being a replication dump, we can
+      // uncomment this else section, but currently unnneeded. Will require a lot of golden file
+      // regen if we do so.
+    }
+    return table;
+  }
+
+  private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
+    return table.isSetTableType()
+        && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+  }
+
+  private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) {
+    Map<String, String> params = table.getParameters();
+    return params.containsKey("EXTERNAL")
+        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+  }
+
+  private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    writer.jsonGenerator.writeStartArray();
+    if (partitions != null) {
+      for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
+        new PartitionSerializer(partition.getTPartition())
+            .writeTo(writer, additionalPropertiesProvider);
+      }
+    }
+    writer.jsonGenerator.writeEndArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
new file mode 100644
index 0000000..8201173
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION;
+
+/**
+ * This is not used as of now as the conditional which lead to its usage is always false
+ * hence we have removed the conditional and the usage of this class, but might be required in future.
+ */
+public class VersionCompatibleSerializer implements JsonWriter.Serializer {
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    writer.jsonGenerator.writeStringField("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
index 9a4f8b9..1616ab9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
@@ -35,7 +35,7 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.Iterator;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 public class AddPartitionHandler extends AbstractHandler {
   protected AddPartitionHandler(NotificationEvent notificationEvent) {
@@ -108,7 +108,7 @@ public class AddPartitionHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_ADD_PARTITION;
+  public DumpType dumpType() {
+    return DumpType.EVENT_ADD_PARTITION;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
index 20d04dc..b6c3496 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
@@ -23,14 +23,14 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class AlterPartitionHandler extends AbstractHandler {
   private final org.apache.hadoop.hive.metastore.api.Partition after;
@@ -51,24 +51,24 @@ public class AlterPartitionHandler extends AbstractHandler {
   private enum Scenario {
     ALTER {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_ALTER_PARTITION;
+      DumpType dumpType() {
+        return DumpType.EVENT_ALTER_PARTITION;
       }
     },
     RENAME {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_RENAME_PARTITION;
+      DumpType dumpType() {
+        return DumpType.EVENT_RENAME_PARTITION;
       }
     },
     TRUNCATE {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_TRUNCATE_PARTITION;
+      DumpType dumpType() {
+        return DumpType.EVENT_TRUNCATE_PARTITION;
       }
     };
 
-    abstract DUMPTYPE dumpType();
+    abstract DumpType dumpType();
   }
 
   private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
@@ -90,14 +90,14 @@ public class AlterPartitionHandler extends AbstractHandler {
     if (Scenario.ALTER == scenario) {
       withinContext.replicationSpec.setIsMetadataOnly(true);
       Table qlMdTable = new Table(tableObject);
-      List<Partition> qlPtns = new ArrayList<>();
-      qlPtns.add(new Partition(qlMdTable, after));
+      List<Partition> partitions = new ArrayList<>();
+      partitions.add(new Partition(qlMdTable, after));
       Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
       EximUtil.createExportDump(
           metaDataPath.getFileSystem(withinContext.hiveConf),
           metaDataPath,
           qlMdTable,
-          qlPtns,
+          partitions,
           withinContext.replicationSpec);
     }
     DumpMetaData dmd = withinContext.createDmd(this);
@@ -106,7 +106,7 @@ public class AlterPartitionHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
+  public DumpType dumpType() {
     return scenario.dumpType();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
index bfe0181..d553240 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
@@ -22,13 +22,12 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class AlterTableHandler extends AbstractHandler {
-  private final org.apache.hadoop.hive.metastore.api.Table before;
   private final org.apache.hadoop.hive.metastore.api.Table after;
   private final boolean isTruncateOp;
   private final Scenario scenario;
@@ -36,30 +35,30 @@ public class AlterTableHandler extends AbstractHandler {
   private enum Scenario {
     ALTER {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_ALTER_TABLE;
+      DumpType dumpType() {
+        return DumpType.EVENT_ALTER_TABLE;
       }
     },
     RENAME {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_RENAME_TABLE;
+      DumpType dumpType() {
+        return DumpType.EVENT_RENAME_TABLE;
       }
     },
     TRUNCATE {
       @Override
-      DUMPTYPE dumpType() {
-        return DUMPTYPE.EVENT_TRUNCATE_TABLE;
+      DumpType dumpType() {
+        return DumpType.EVENT_TRUNCATE_TABLE;
       }
     };
 
-    abstract DUMPTYPE dumpType();
+    abstract DumpType dumpType();
   }
 
   AlterTableHandler(NotificationEvent event) throws Exception {
     super(event);
     AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
-    before = atm.getTableObjBefore();
+    org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
     after = atm.getTableObjAfter();
     isTruncateOp = atm.getIsTruncateOp();
     scenario = scenarioType(before, after);
@@ -97,7 +96,7 @@ public class AlterTableHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
+  public DumpType dumpType() {
     return scenario.dumpType();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
index 03f400d..88600fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
@@ -28,7 +28,7 @@ import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 public class CreateTableHandler extends AbstractHandler {
 
@@ -80,7 +80,7 @@ public class CreateTableHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_CREATE_TABLE;
+  public DumpType dumpType() {
+    return DumpType.EVENT_CREATE_TABLE;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
index 61c5f37..78cd74f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class DefaultHandler extends AbstractHandler {
 
@@ -37,7 +38,7 @@ public class DefaultHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_UNKNOWN;
+  public DumpType dumpType() {
+    return DumpType.EVENT_UNKNOWN;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
index 3ad794e..c4a0908 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class DropPartitionHandler extends AbstractHandler {
 
@@ -37,7 +38,7 @@ public class DropPartitionHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_DROP_PARTITION;
+  public DumpType dumpType() {
+    return DumpType.EVENT_DROP_PARTITION;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
index cae379b..e3addaf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class DropTableHandler extends AbstractHandler {
 
@@ -37,7 +38,7 @@ public class DropTableHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_DROP_TABLE;
+  public DumpType dumpType() {
+    return DumpType.EVENT_DROP_TABLE;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
index 199145a..29f3b42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 
 public interface EventHandler {
   void handle(Context withinContext) throws Exception;
@@ -32,7 +32,7 @@ public interface EventHandler {
 
   long toEventId();
 
-  DUMPTYPE dumpType();
+  DumpType dumpType();
 
   class Context {
     final Path eventRoot, cmRoot;

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
index e9f2a6a..910b396 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
@@ -32,8 +32,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 public class InsertHandler extends AbstractHandler {
 
@@ -103,7 +104,7 @@ public class InsertHandler extends AbstractHandler {
   }
 
   @Override
-  public DUMPTYPE dumpType() {
-    return DUMPTYPE.EVENT_INSERT;
+  public DumpType dumpType() {
+    return DumpType.EVENT_INSERT;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
new file mode 100644
index 0000000..12ad19b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+
+public class DumpMetaData {
+  // wrapper class for reading and writing metadata about a dump
+  // responsible for _dumpmetadata files
+  private static final String DUMP_METADATA = "_dumpmetadata";
+
+  private DumpType dumpType;
+  private Long eventFrom = null;
+  private Long eventTo = null;
+  private String payload = null;
+  private boolean initialized = false;
+
+  private final Path dumpFile;
+  private final HiveConf hiveConf;
+  private Path cmRoot;
+
+  public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+    dumpFile = new Path(dumpRoot, DUMP_METADATA);
+  }
+
+  public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot,
+      HiveConf hiveConf) {
+    this(dumpRoot, hiveConf);
+    setDump(lvl, eventFrom, eventTo, cmRoot);
+  }
+
+  public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) {
+    this.dumpType = lvl;
+    this.eventFrom = eventFrom;
+    this.eventTo = eventTo;
+    this.initialized = true;
+    this.cmRoot = cmRoot;
+  }
+
+  private void loadDumpFromFile() throws SemanticException {
+    try {
+      // read from dumpfile and instantiate self
+      FileSystem fs = dumpFile.getFileSystem(hiveConf);
+      BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+      String line = null;
+      if ((line = br.readLine()) != null) {
+        String[] lineContents = line.split("\t", 5);
+        setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]),
+            Long.valueOf(lineContents[2]),
+            new Path(lineContents[3]));
+        setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
+        ReplChangeManager.setCmRoot(cmRoot);
+      } else {
+        throw new IOException(
+            "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString());
+      }
+    } catch (IOException ioe) {
+      throw new SemanticException(ioe);
+    }
+  }
+
+  public DumpType getDumpType() throws SemanticException {
+    initializeIfNot();
+    return this.dumpType;
+  }
+
+  public String getPayload() throws SemanticException {
+    initializeIfNot();
+    return this.payload;
+  }
+
+  public void setPayload(String payload) {
+    this.payload = payload;
+  }
+
+  public Long getEventFrom() throws SemanticException {
+    initializeIfNot();
+    return eventFrom;
+  }
+
+  public Long getEventTo() throws SemanticException {
+    initializeIfNot();
+    return eventTo;
+  }
+
+  public Path getDumpFilePath() {
+    return dumpFile;
+  }
+
+  public boolean isIncrementalDump() throws SemanticException {
+    initializeIfNot();
+    return (this.dumpType == DumpType.INCREMENTAL);
+  }
+
+  private void initializeIfNot() throws SemanticException {
+    if (!initialized) {
+      loadDumpFromFile();
+    }
+  }
+
+
+  public void write() throws SemanticException {
+    Utils.writeOutput(
+        Arrays.asList(
+            dumpType.toString(),
+            eventFrom.toString(),
+            eventTo.toString(),
+            cmRoot.toString(),
+            payload),
+        dumpFile,
+        hiveConf
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
new file mode 100644
index 0000000..fc02dfd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+/**
+ * Utility class to help return complex value from readMetaData function
+ */
+public class MetaData {
+  private final Database db;
+  private final Table table;
+  private final Iterable<Partition> partitions;
+  private final ReplicationSpec replicationSpec;
+  public final Function function;
+
+  public MetaData() {
+    this(null, null, null, new ReplicationSpec(), null);
+  }
+
+  MetaData(Database db, Table table, Iterable<Partition> partitions,
+      ReplicationSpec replicationSpec, Function function) {
+    this.db = db;
+    this.table = table;
+    this.partitions = partitions;
+    this.replicationSpec = replicationSpec;
+    this.function = function;
+  }
+
+  public Database getDatabase() {
+    return db;
+  }
+
+  public Table getTable() {
+    return table;
+  }
+
+  public Iterable<Partition> getPartitions() {
+    return partitions;
+  }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
new file mode 100644
index 0000000..b7a5680
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.PartitionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter.Serializer.UTF_8;
+
+public class MetadataJson {
+  private final JSONObject json;
+  private final TDeserializer deserializer;
+  private final String tableDesc;
+
+  public MetadataJson(String message) throws JSONException, SemanticException {
+    deserializer = new TDeserializer(new TJSONProtocol.Factory());
+    json = new JSONObject(message);
+    checkCompatibility();
+    tableDesc = jsonEntry(TableSerializer.FIELD_NAME);
+  }
+
+  public MetaData getMetaData() throws TException, JSONException {
+    return new MetaData(
+        database(),
+        table(),
+        partitions(),
+        readReplicationSpec(),
+        function()
+    );
+  }
+
+  private Function function() throws TException {
+    return deserialize(new Function(), jsonEntry(FunctionSerializer.FIELD_NAME));
+  }
+
+  private Database database() throws TException {
+    return deserialize(new Database(), jsonEntry(DBSerializer.FIELD_NAME));
+  }
+
+  private Table table() throws TException {
+    return deserialize(new Table(), tableDesc);
+  }
+
+  private <T extends TBase> T deserialize(T intoObject, String json) throws TException {
+    if (json == null) {
+      return null;
+    }
+    deserializer.deserialize(intoObject, json, UTF_8);
+    return intoObject;
+  }
+
+  private List<Partition> partitions() throws JSONException, TException {
+    if (tableDesc == null) {
+      return null;
+    }
+    // TODO : jackson-streaming-iterable-redo this
+    JSONArray jsonPartitions = new JSONArray(json.getString(PartitionSerializer.FIELD_NAME));
+    List<Partition> partitionsList = new ArrayList<>(jsonPartitions.length());
+    for (int i = 0; i < jsonPartitions.length(); ++i) {
+      String partDesc = jsonPartitions.getString(i);
+      partitionsList.add(deserialize(new Partition(), partDesc));
+    }
+    return partitionsList;
+  }
+
+  private ReplicationSpec readReplicationSpec() {
+    com.google.common.base.Function<String, String> keyFetcher =
+        new com.google.common.base.Function<String, String>() {
+          @Override
+          public String apply(@Nullable String s) {
+            return jsonEntry(s);
+          }
+        };
+    return new ReplicationSpec(keyFetcher);
+  }
+
+  private void checkCompatibility() throws SemanticException, JSONException {
+    String version = json.getString("version");
+    String fcVersion = jsonEntry("fcversion");
+    EximUtil.doCheckCompatibility(
+        EximUtil.METADATA_FORMAT_VERSION,
+        version,
+        fcVersion);
+  }
+
+  private String jsonEntry(String forName) {
+    try {
+      return json.getString(forName);
+    } catch (JSONException ignored) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
new file mode 100644
index 0000000..3028e76
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HiveWrapperTest {
+  @Mock
+  private HiveWrapper.Tuple.Function<ReplicationSpec> specFunction;
+  @Mock
+  private HiveWrapper.Tuple.Function<Table> tableFunction;
+
+  @Test
+  public void replicationIdIsRequestedBeforeObjectDefinition() throws HiveException {
+    new HiveWrapper.Tuple<>(specFunction, tableFunction);
+    InOrder inOrder = Mockito.inOrder(specFunction, tableFunction);
+    inOrder.verify(specFunction).fromMetaStore();
+    inOrder.verify(tableFunction).fromMetaStore();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
index d44cb79..4b802c4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
@@ -44,7 +44,7 @@ public class TestEventHandlerFactory {
       }
 
       @Override
-      public ReplicationSemanticAnalyzer.DUMPTYPE dumpType() {
+      public DumpType dumpType() {
         return null;
       }
     }


[2/2] hive git commit: HIVE-16267 : Enable bootstrap function metadata to be loaded in repl load (Anishek Agarwal, reviewed by Sushanth Sowmyan)

Posted by kh...@apache.org.
HIVE-16267 : Enable bootstrap function metadata to be loaded in repl load (Anishek Agarwal, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e9356b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e9356b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e9356b5

Branch: refs/heads/master
Commit: 9e9356b5e2cd03ff327ac1b269983454118e5f8e
Parents: f56abb4
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Thu May 4 01:37:59 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Thu May 4 02:49:27 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 187 ++++++------
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   1 -
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 157 ++--------
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   5 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 305 ++++++-------------
 .../hadoop/hive/ql/parse/repl/DumpType.java     |  45 +++
 .../dump/BootStrapReplicationSpecFunction.java  |  54 ++++
 .../hive/ql/parse/repl/dump/DBSerializer.java   |  54 ----
 .../ql/parse/repl/dump/FunctionSerializer.java  |  48 ---
 .../hive/ql/parse/repl/dump/HiveWrapper.java    |  73 +++++
 .../hive/ql/parse/repl/dump/JsonWriter.java     |  54 ----
 .../ql/parse/repl/dump/PartitionSerializer.java |  64 ----
 .../repl/dump/ReplicationSpecSerializer.java    |  36 ---
 .../ql/parse/repl/dump/TableSerializer.java     | 113 -------
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |  50 +++
 .../repl/dump/VersionCompatibleSerializer.java  |  37 ---
 .../ql/parse/repl/dump/io/DBSerializer.java     |  55 ++++
 .../parse/repl/dump/io/FunctionSerializer.java  |  49 +++
 .../hive/ql/parse/repl/dump/io/JsonWriter.java  |  55 ++++
 .../parse/repl/dump/io/PartitionSerializer.java |  65 ++++
 .../repl/dump/io/ReplicationSpecSerializer.java |  36 +++
 .../ql/parse/repl/dump/io/TableSerializer.java  | 114 +++++++
 .../dump/io/VersionCompatibleSerializer.java    |  37 +++
 .../parse/repl/events/AddPartitionHandler.java  |   6 +-
 .../repl/events/AlterPartitionHandler.java      |  28 +-
 .../ql/parse/repl/events/AlterTableHandler.java |  25 +-
 .../parse/repl/events/CreateTableHandler.java   |   6 +-
 .../ql/parse/repl/events/DefaultHandler.java    |   9 +-
 .../parse/repl/events/DropPartitionHandler.java |   9 +-
 .../ql/parse/repl/events/DropTableHandler.java  |   9 +-
 .../hive/ql/parse/repl/events/EventHandler.java |   6 +-
 .../ql/parse/repl/events/InsertHandler.java     |   9 +-
 .../hive/ql/parse/repl/load/DumpMetaData.java   | 143 +++++++++
 .../hive/ql/parse/repl/load/MetaData.java       |  64 ++++
 .../hive/ql/parse/repl/load/MetadataJson.java   | 128 ++++++++
 .../ql/parse/repl/dump/HiveWrapperTest.java     |  27 ++
 .../repl/events/TestEventHandlerFactory.java    |   4 +-
 37 files changed, 1273 insertions(+), 894 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index b3cbae0..5173d8b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -42,7 +42,9 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,17 +64,22 @@ import static org.junit.Assert.assertNull;
 
 public class TestReplicationScenarios {
 
-  final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener";
+  @Rule
+  public final TestName testName = new TestName();
+
+  private final static String DBNOTIF_LISTENER_CLASSNAME =
+      "org.apache.hive.hcatalog.listener.DbNotificationListener";
       // FIXME : replace with hive copy once that is copied
-  final static String tid =
+  private final static String tid =
       TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
-  final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid;
+  private final static String TEST_PATH =
+      System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
 
-  static HiveConf hconf;
-  static boolean useExternalMS = false;
-  static int msPort;
-  static Driver driver;
-  static HiveMetaStoreClient metaStoreClient;
+  private static HiveConf hconf;
+  private static boolean useExternalMS = false;
+  private static int msPort;
+  private static Driver driver;
+  private static HiveMetaStoreClient metaStoreClient;
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   private ArrayList<String> lastResults;
@@ -141,6 +148,32 @@ public class TestReplicationScenarios {
     ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
   }
 
+  @Test
+  public void testFunctionReplicationAsPartOfBootstrap() throws IOException {
+    String dbName = createDB(testName.getMethodName());
+    run("CREATE FUNCTION " + dbName
+        + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+        + "using jar  'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+
+    String replicatedDbName = loadAndVerify(dbName);
+    run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'");
+    verifyResults(new String[] { replicatedDbName + ".testFunction" });
+  }
+
+  private String loadAndVerify(String dbName) throws IOException {
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String dumpLocation = getResult(0, 0);
+    String lastReplicationId = getResult(0, 1, true);
+    String replicatedDbName = dbName + "_replicated";
+    run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    printOutput();
+    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+    return replicatedDbName;
+  }
+
+
   /**
    * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
    * Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -149,12 +182,8 @@ public class TestReplicationScenarios {
    */
   @Test
   public void testBasic() throws IOException {
-
-    String testName = "basic";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -165,9 +194,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -182,31 +211,19 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
     verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
 
-    advanceDumpDir();
-    run("REPL DUMP " + dbName);
-    String replDumpLocn = getResult(0,0);
-    String replDumpId = getResult(0,1,true);
-    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
-    printOutput();
-    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    String replicatedDbName = loadAndVerify(dbName);
 
-    verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId);
-
-    verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
-    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+    verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
     verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
     verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
   }
 
   @Test
   public void testBasicWithCM() throws Exception {
-
-    String testName = "basic_with_cm";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -218,10 +235,10 @@ public class TestReplicationScenarios {
     String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
-    String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+    String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -334,11 +351,8 @@ public class TestReplicationScenarios {
 
   @Test
   public void testIncrementalAdds() throws IOException {
-    String testName = "incrementalAdds";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
 
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -357,9 +371,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -421,11 +435,8 @@ public class TestReplicationScenarios {
   @Test
   public void testDrops() throws IOException {
 
-    String testName = "drops";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -436,9 +447,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -535,10 +546,7 @@ public class TestReplicationScenarios {
   public void testDropsWithCM() throws IOException {
 
     String testName = "drops_with_cm";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -661,10 +669,7 @@ public class TestReplicationScenarios {
   public void testAlters() throws IOException {
 
     String testName = "alters";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -846,10 +851,7 @@ public class TestReplicationScenarios {
   @Test
   public void testIncrementalLoad() throws IOException {
     String testName = "incrementalLoad";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
 
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -934,10 +936,7 @@ public class TestReplicationScenarios {
   @Test
   public void testIncrementalInserts() throws IOException {
     String testName = "incrementalInserts";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
 
     advanceDumpDir();
@@ -1062,10 +1061,7 @@ public class TestReplicationScenarios {
   @Test
   public void testViewsReplication() throws IOException {
     String testName = "viewsReplication";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
 
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -1142,11 +1138,8 @@ public class TestReplicationScenarios {
 
   @Test
   public void testDumpLimit() throws IOException {
-    String testName = "dumpLimit";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
 
     advanceDumpDir();
@@ -1530,11 +1523,8 @@ public class TestReplicationScenarios {
 
     // Now, to actually testing status - first, we bootstrap.
 
-    String testName = "incrementalStatus";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     advanceDumpDir();
     run("REPL DUMP " + dbName);
     String lastReplDumpLocn = getResult(0, 0);
@@ -1589,6 +1579,13 @@ public class TestReplicationScenarios {
 
   }
 
+  private static String createDB(String name) {
+    LOG.info("Testing " + name);
+    String dbName = name + "_" + tid;
+    run("CREATE DATABASE " + dbName);
+    return dbName;
+  }
+
   @Test
   public void testEventFilters(){
     // Test testing that the filters introduced by EventUtils are working correctly.
@@ -1749,18 +1746,25 @@ public class TestReplicationScenarios {
     return (lastResults.get(rowNum).split("\\t"))[colNum];
   }
 
+  /**
+   * All the results that are read from the hive output will not preserve
+   * case sensitivity and will all be in lower case, hence we will check against
+   * only lower case data values.
+   * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+   * before assert.
+   */
   private void verifyResults(String[] data) throws IOException {
     List<String> results = getOutput();
-    LOG.info("Expecting {}",data);
-    LOG.info("Got {}",results);
-    assertEquals(data.length,results.size());
-    for (int i = 0; i < data.length; i++){
-      assertEquals(data[i],results.get(i));
+    LOG.info("Expecting {}", data);
+    LOG.info("Got {}", results);
+    assertEquals(data.length, results.size());
+    for (int i = 0; i < data.length; i++) {
+      assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
     }
   }
 
   private List<String> getOutput() throws IOException {
-    List<String> results = new ArrayList<String>();
+    List<String> results = new ArrayList<>();
     try {
       driver.getResults(results);
     } catch (CommandNeedRetryException e) {
@@ -1848,5 +1852,4 @@ public class TestReplicationScenarios {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5b49dfd..5b908e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1437,7 +1437,6 @@ public class Hive {
    */
   public List<String> getTablesByType(String dbName, String pattern, TableType type)
       throws HiveException {
-    List<String> retList = new ArrayList<String>();
     if (dbName == null)
       dbName = SessionState.get().getCurrentDatabase();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 1ea5182..a9384be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -18,43 +18,36 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
-import com.google.common.base.Function;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
-import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer;
-import org.apache.thrift.TDeserializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.json.JSONArray;
 import org.json.JSONException;
-import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -270,124 +263,30 @@ public class EximUtil {
     }
   }
 
-  /**
-   * Utility class to help return complex value from readMetaData function
-   */
-  public static class ReadMetaData {
-    private final Database db;
-    private final Table table;
-    private final Iterable<Partition> partitions;
-    private final ReplicationSpec replicationSpec;
-
-    public ReadMetaData(){
-      this(null,null,null,new ReplicationSpec());
-    }
-    public ReadMetaData(Database db, Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
-      this.db = db;
-      this.table = table;
-      this.partitions = partitions;
-      this.replicationSpec = replicationSpec;
-    }
-
-    public Database getDatabase(){
-      return db;
-    }
-
-    public Table getTable() {
-      return table;
-    }
-
-    public Iterable<Partition> getPartitions() {
-      return partitions;
-    }
-
-    public ReplicationSpec getReplicationSpec() {
-      return replicationSpec;
-    }
-  };
-
-  public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath)
+  static MetaData readMetaData(FileSystem fs, Path metadataPath)
       throws IOException, SemanticException {
-    FSDataInputStream mdstream = null;
+    String message = readAsString(fs, metadataPath);
     try {
-      mdstream = fs.open(metadataPath);
+      return new MetadataJson(message).getMetaData();
+    } catch (TException | JSONException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
+    }
+  }
+
+  private static String readAsString(final FileSystem fs, final Path fromMetadataPath)
+      throws IOException {
+    try (FSDataInputStream stream = fs.open(fromMetadataPath)) {
       byte[] buffer = new byte[1024];
       ByteArrayOutputStream sb = new ByteArrayOutputStream();
-      int read = mdstream.read(buffer);
+      int read = stream.read(buffer);
       while (read != -1) {
         sb.write(buffer, 0, read);
-        read = mdstream.read(buffer);
-      }
-      String md = new String(sb.toByteArray(), "UTF-8");
-      JSONObject jsonContainer = new JSONObject(md);
-      String version = jsonContainer.getString("version");
-      String fcversion = getJSONStringEntry(jsonContainer, "fcversion");
-      checkCompatibility(version, fcversion);
-
-      String dbDesc = getJSONStringEntry(jsonContainer, "db");
-      String tableDesc = getJSONStringEntry(jsonContainer,"table");
-      TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
-
-      Database db = null;
-      if (dbDesc != null){
-        db = new Database();
-        deserializer.deserialize(db, dbDesc, "UTF-8");
-      }
-
-      Table table = null;
-      List<Partition> partitionsList = null;
-      if (tableDesc != null){
-        table = new Table();
-        deserializer.deserialize(table, tableDesc, "UTF-8");
-        // TODO : jackson-streaming-iterable-redo this
-        JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions"));
-        partitionsList = new ArrayList<Partition>(jsonPartitions.length());
-        for (int i = 0; i < jsonPartitions.length(); ++i) {
-          String partDesc = jsonPartitions.getString(i);
-          Partition partition = new Partition();
-          deserializer.deserialize(partition, partDesc, "UTF-8");
-          partitionsList.add(partition);
-        }
-      }
-
-      return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer));
-    } catch (JSONException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
-    } catch (TException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
-    } finally {
-      if (mdstream != null) {
-        mdstream.close();
+        read = stream.read(buffer);
       }
+      return new String(sb.toByteArray(), "UTF-8");
     }
   }
 
-  private static ReplicationSpec readReplicationSpec(final JSONObject jsonContainer){
-    Function<String,String> keyFetcher = new Function<String, String>() {
-      @Override
-      public String apply(@Nullable String s) {
-        return getJSONStringEntry(jsonContainer,s);
-      }
-    };
-    return new ReplicationSpec(keyFetcher);
-  }
-
-  private static String getJSONStringEntry(JSONObject jsonContainer, String name) {
-    String retval = null;
-    try {
-      retval = jsonContainer.getString(name);
-    } catch (JSONException ignored) {}
-    return retval;
-  }
-
-  /* check the forward and backward compatibility */
-  private static void checkCompatibility(String version, String fcVersion) throws SemanticException {
-    doCheckCompatibility(
-        METADATA_FORMAT_VERSION,
-        version,
-        fcVersion);
-  }
-
   /* check the forward and backward compatibility */
   public static void doCheckCompatibility(String currVersion,
       String version, String fcVersion) throws SemanticException {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 71d6074..dc86942 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -38,7 +37,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -60,6 +58,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -186,7 +185,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSystem fs = FileSystem.get(fromURI, x.getConf());
     x.getInputs().add(toReadEntity(fromPath, x.getConf()));
 
-    EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+    MetaData rv = new MetaData();
     try {
       rv =  EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 37aa3ba..2daa123 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,77 +17,70 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.google.common.primitives.Ints;
 import org.antlr.runtime.tree.Tree;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Predicate;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventUtils;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler;
 import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
@@ -120,154 +113,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
-  public static final String DUMPMETADATA = "_dumpmetadata";
-
-  public enum DUMPTYPE {
-    BOOTSTRAP("BOOTSTRAP"),
-    INCREMENTAL("INCREMENTAL"),
-    EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
-    EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
-    EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
-    EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
-    EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"),
-    EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
-    EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"),
-    EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
-    EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
-    EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
-    EVENT_INSERT("EVENT_INSERT"),
-    EVENT_UNKNOWN("EVENT_UNKNOWN");
-
-    String type = null;
-    DUMPTYPE(String type) {
-      this.type = type;
-    }
-
-    @Override
-    public String toString(){
-      return type;
-    }
-
-  };
-
-  public static class DumpMetaData {
-    // wrapper class for reading and writing metadata about a dump
-    // responsible for _dumpmetadata files
-
-    private DUMPTYPE dumpType;
-    private Long eventFrom = null;
-    private Long eventTo = null;
-    private String payload = null;
-    private boolean initialized = false;
-
-    private final Path dumpRoot;
-    private final Path dumpFile;
-    private final HiveConf hiveConf;
-    private Path cmRoot;
-
-    public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
-      this.dumpRoot = dumpRoot;
-      this.hiveConf = hiveConf;
-      dumpFile = new Path(dumpRoot, DUMPMETADATA);
-    }
-
-    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot,
-        HiveConf hiveConf){
-      this(dumpRoot,hiveConf);
-      setDump(lvl, eventFrom, eventTo, cmRoot);
-    }
-
-    public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
-      this.dumpType = lvl;
-      this.eventFrom = eventFrom;
-      this.eventTo = eventTo;
-      this.initialized = true;
-      this.cmRoot = cmRoot;
-    }
-
-    public void loadDumpFromFile() throws SemanticException {
-      try {
-        // read from dumpfile and instantiate self
-        FileSystem fs = dumpFile.getFileSystem(hiveConf);
-        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
-        String line = null;
-        if ( (line = br.readLine()) != null){
-          String[] lineContents = line.split("\t", 5);
-          setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]),
-              new Path(lineContents[3]));
-          setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
-          ReplChangeManager.setCmRoot(cmRoot);
-        } else {
-          throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
-        }
-      } catch (IOException ioe){
-        throw new SemanticException(ioe);
-      }
-    }
-
-    public DUMPTYPE getDumpType() throws SemanticException {
-      initializeIfNot();
-      return this.dumpType;
-    }
-
-    public String getPayload() throws SemanticException {
-      initializeIfNot();
-      return this.payload;
-    }
-
-    public void setPayload(String payload) {
-      this.payload = payload;
-    }
-
-    public Long getEventFrom() throws SemanticException {
-      initializeIfNot();
-      return eventFrom;
-    }
-
-    public Long getEventTo() throws SemanticException {
-      initializeIfNot();
-      return eventTo;
-    }
-
-    public Path getCmRoot() {
-      return cmRoot;
-    }
-
-    public void setCmRoot(Path cmRoot) {
-      this.cmRoot = cmRoot;
-    }
-
-    public Path getDumpFilePath() {
-      return dumpFile;
-    }
-
-    public boolean isIncrementalDump() throws SemanticException {
-      initializeIfNot();
-      return (this.dumpType == DUMPTYPE.INCREMENTAL);
-    }
-
-    private void initializeIfNot() throws SemanticException {
-      if (!initialized){
-        loadDumpFromFile();
-      }
-    }
-
-    public void write() throws SemanticException {
-      writeOutput(
-          Arrays.asList(
-              dumpType.toString(),
-              eventFrom.toString(),
-              eventTo.toString(),
-              cmRoot.toString(),
-              payload),
-          dumpFile,
-          hiveConf
-      );
-    }
 
-  }
-
-  public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+  ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
 
@@ -387,7 +234,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         LOG.info(
             "Consolidation done, preparing to return {},{}->{}",
             dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
-        dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
+        dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
         dmd.write();
 
         // Set the correct last repl id to return to the user
@@ -433,10 +280,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
 
         LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
-        writeOutput(
-            Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(lastReplId)),
+        Utils.writeOutput(
+            Arrays.asList(
+                "incremental",
+                String.valueOf(eventFrom),
+                String.valueOf(lastReplId)
+            ),
             dmd.getDumpFilePath(), conf);
-        dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, lastReplId, cmRoot);
+        dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot);
         dmd.write();
       }
       prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
@@ -463,7 +314,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     testInjectDumpDir = dumpdir;
   }
 
-  String getNextDumpDir() {
+  private String getNextDumpDir() {
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
       // make it easy to write .q unit tests, instead of unique id generation.
       // however, this does mean that in writing tests, we have to be aware that
@@ -494,8 +345,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // TODO : instantiating FS objects are generally costly. Refactor
       FileSystem fs = dbRoot.getFileSystem(conf);
       Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
-      Database dbObj = db.getDatabase(dbName);
-      EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+      HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
+      EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e);
@@ -513,9 +364,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // TODO : This should ideally return the Function Objects and not Strings(function names) that should be done by the caller, Look at this separately.
       List<String> functionNames = db.getFunctions(dbName, "*");
       for (String functionName : functionNames) {
-        org.apache.hadoop.hive.metastore.api.Function function =
-            db.getFunction(dbName, functionName);
-        if (function.getResourceUris().isEmpty()) {
+        HiveWrapper.Tuple<Function> tuple;
+        try {
+          tuple = new HiveWrapper(db, dbName).function(functionName);
+        } catch (HiveException e) {
+          //This can happen as we are querying the getFunctions before we are getting the actual function
+          //in between there can be a drop function by a user in which case our call will fail.
+          LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e);
+          continue;
+        }
+        if (tuple.object.getResourceUris().isEmpty()) {
           SESSION_STATE_LOG.warn(
               "Not replicating function: " + functionName + " as it seems to have been created "
                   + "without USING clause");
@@ -526,7 +384,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME);
         try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf),
             functionMetadataRoot)) {
-          new FunctionSerializer(function).writeTo(jsonWriter, getNewReplicationSpec());
+          new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec);
         }
       }
     } catch (Exception e) {
@@ -738,7 +596,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                 taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
             taskChainTail = barrierTask;
             evstage++;
-            lastEvid = dmd.eventTo;
+            lastEvid = dmd.getEventTo();
           }
         }
 
@@ -1121,7 +979,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // associated with that
       // Then, we iterate over all subdirs, and create table imports for each.
 
-      EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+      MetaData rv = new MetaData();
       try {
         rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
       } catch (IOException e) {
@@ -1163,15 +1021,67 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       rootTasks.add(dbRootTask);
       FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
 
-      for (FileStatus tableDir : dirsInDbPath) {
+      for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) {
         analyzeTableLoad(
             dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null);
       }
+
+      //Function load
+      Path functionMetaDataRoot = new Path(dir.getPath(), FUNCTIONS_ROOT_DIR_NAME);
+      if (fs.exists(functionMetaDataRoot)) {
+        List<FileStatus> functionDirectories =
+            Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs)));
+        for (FileStatus functionDir : functionDirectories) {
+          analyzeFunctionLoad(dbName, functionDir, dbRootTask);
+        }
+      }
     } catch (Exception e) {
       throw new SemanticException(e);
     }
   }
 
+  private static class TableDirPredicate implements Predicate<FileStatus> {
+    @Override
+    public boolean apply(FileStatus fileStatus) {
+      return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME);
+    }
+  }
+
+  private void analyzeFunctionLoad(String dbName, FileStatus functionDir,
+      Task<? extends Serializable> createDbTask) throws IOException, SemanticException {
+    URI fromURI = EximUtil
+        .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString()));
+    Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+
+    FileSystem fs = FileSystem.get(fromURI, conf);
+    inputs.add(toReadEntity(fromPath, conf));
+
+    try {
+      MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+      ReplicationSpec replicationSpec = metaData.getReplicationSpec();
+      if (replicationSpec.isNoop()) {
+        // nothing to do here, silently return.
+        return;
+      }
+      CreateFunctionDesc desc = new CreateFunctionDesc(
+          dbName + "." + metaData.function.getFunctionName(),
+          false,
+          metaData.function.getClassName(),
+          metaData.function.getResourceUris()
+      );
+
+      Task<FunctionWork> currentTask = TaskFactory.get(new FunctionWork(desc), conf);
+      if (createDbTask != null) {
+        createDbTask.addDependentTask(currentTask);
+        LOG.debug("Added {}:{} as a precursor of {}:{}",
+            createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(),
+            currentTask.getId());
+      }
+    } catch (IOException e) {
+      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+    }
+  }
+
   private List<Task<? extends Serializable>> analyzeTableLoad(
       String dbName, String tblName, String locn,
       Task<? extends Serializable> precursor,
@@ -1270,27 +1180,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       LOG.debug("    > " + s);
     }
     ctx.setResFile(ctx.getLocalTmpPath());
-    writeOutput(values, ctx.getResFile(), conf);
-  }
-
-  private static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf)
-      throws SemanticException {
-    FileSystem fs = null;
-    DataOutputStream outStream = null;
-    try {
-      fs = outputFile.getFileSystem(hiveConf);
-      outStream = fs.create(outputFile);
-      outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
-      for (int i = 1; i < values.size(); i++) {
-        outStream.write(Utilities.tabCode);
-        outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
-      }
-      outStream.write(Utilities.newLineCode);
-    } catch (IOException e) {
-      throw new SemanticException(e);
-    } finally {
-      IOUtils.closeStream(outStream);
-    }
+    Utils.writeOutput(values, ctx.getResFile(), conf);
   }
 
   private ReplicationSpec getNewReplicationSpec() throws SemanticException {
@@ -1327,14 +1217,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
 
   static Iterable<String> removeValuesTemporaryTables(List<String> tableNames) {
-    List<String> allTables = new ArrayList<>(tableNames);
-    CollectionUtils.filter(allTables, new Predicate() {
-      @Override
-      public boolean evaluate(Object tableName) {
-        return !tableName.toString().toLowerCase().startsWith(TMP_TABLE_PREFIX);
-      }
-    });
-    return allTables;
+    return Collections2.filter(tableNames,
+        tableName -> {
+          assert tableName != null;
+          return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
+        });
   }
 
   private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
new file mode 100644
index 0000000..b1df5a3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl;
+
+public enum DumpType {
+  BOOTSTRAP("BOOTSTRAP"),
+  INCREMENTAL("INCREMENTAL"),
+  EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
+  EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
+  EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
+  EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
+  EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"),
+  EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
+  EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"),
+  EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
+  EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
+  EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
+  EVENT_INSERT("EVENT_INSERT"),
+  EVENT_UNKNOWN("EVENT_UNKNOWN");
+
+  String type = null;
+  DumpType(String type) {
+    this.type = type;
+  }
+
+  @Override
+  public String toString(){
+    return type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
new file mode 100644
index 0000000..ae37c73
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<ReplicationSpec> {
+  private final Hive db;
+
+  BootStrapReplicationSpecFunction(Hive db) {
+    this.db = db;
+  }
+
+  @Override
+  public ReplicationSpec fromMetaStore() throws HiveException {
+    try {
+      ReplicationSpec replicationSpec =
+          new ReplicationSpec(
+              true,
+              false,
+              "replv2",
+              "will-be-set",
+              false,
+              true,
+              false
+          );
+      long currentNotificationId = db.getMSC()
+          .getCurrentNotificationEventId().getEventId();
+      replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
+      return replicationSpec;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+      // TODO : simple wrap & rethrow for now, clean up with error codes
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
deleted file mode 100644
index 40770de..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-
-public class DBSerializer implements JsonWriter.Serializer {
-  private final Database dbObject;
-
-  public DBSerializer(Database dbObject) {
-    this.dbObject = dbObject;
-  }
-
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    dbObject.putToParameters(
-        ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-        additionalPropertiesProvider.getCurrentReplicationState()
-    );
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    try {
-      String value = serializer.toString(dbObject, "UTF-8");
-      writer.jsonGenerator.writeStringField("db", value);
-    } catch (TException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
deleted file mode 100644
index 6b03766..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-
-public class FunctionSerializer implements JsonWriter.Serializer {
-  private Function function;
-
-  public FunctionSerializer(Function function) {
-    this.function = function;
-  }
-
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    try {
-      writer.jsonGenerator
-          .writeStringField("function", serializer.toString(function, "UTF-8"));
-    } catch (TException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
new file mode 100644
index 0000000..1dcaec2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+/**
+ * The idea for this class is that since we need to make sure that
+ * we query the replication id from the db before we do any queries
+ * to get the object from metastore like tables/functions/partitions etc
+ * we are devising this wrapper to wrap all such ordering of statements here.
+ */
+
+public class HiveWrapper {
+  private final Hive db;
+  private final String dbName;
+  private final BootStrapReplicationSpecFunction functionForSpec;
+
+  public HiveWrapper(Hive db, String dbName) {
+    this.dbName = dbName;
+    this.db = db;
+    this.functionForSpec = new BootStrapReplicationSpecFunction(db);
+  }
+
+  public Tuple<org.apache.hadoop.hive.metastore.api.Function> function(final String name)
+      throws HiveException {
+    return new Tuple<>(functionForSpec, () -> db.getFunction(dbName, name));
+  }
+
+  public Tuple<Database> database() throws HiveException {
+    return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName));
+  }
+
+  public static class Tuple<T> {
+
+    interface Function<T> {
+      T fromMetaStore() throws HiveException;
+    }
+
+    public final ReplicationSpec replicationSpec;
+    public final T object;
+
+    /**
+     * we have to get the replicationspec before we query for the function object
+     * from the hive metastore as the spec creation captures the latest event id for replication
+     * and we dont want to miss any events hence we are ok replaying some events as part of
+     * incremental load to achieve a consistent state of the warehouse.
+     */
+    Tuple(Function<ReplicationSpec> replicationSpecFunction,
+        Function<T> functionForObject) throws HiveException {
+      this.replicationSpec = replicationSpecFunction.fromMetaStore();
+      this.object = functionForObject.fromMetaStore();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
deleted file mode 100644
index 1aa1195..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION;
-
-public class JsonWriter implements Closeable {
-
-  final JsonGenerator jsonGenerator;
-
-  public JsonWriter(FileSystem fs, Path writePath) throws IOException {
-    OutputStream out = fs.create(writePath);
-    jsonGenerator = new JsonFactory().createJsonGenerator(out);
-    jsonGenerator.writeStartObject();
-    jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION);
-  }
-
-  @Override
-  public void close() throws IOException {
-    jsonGenerator.writeEndObject();
-    jsonGenerator.close();
-  }
-
-  public interface Serializer {
-    void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws
-        SemanticException, IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
deleted file mode 100644
index 313d108..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-import java.util.Map;
-
-class PartitionSerializer implements JsonWriter.Serializer {
-  private Partition partition;
-
-  PartitionSerializer(Partition partition) {
-    this.partition = partition;
-  }
-
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    try {
-      if (additionalPropertiesProvider.isInReplicationScope()) {
-        partition.putToParameters(
-            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-            additionalPropertiesProvider.getCurrentReplicationState());
-        if (isPartitionExternal()) {
-          // Replication destination will not be external
-          partition.putToParameters("EXTERNAL", "FALSE");
-        }
-      }
-      writer.jsonGenerator.writeString(serializer.toString(partition, "UTF-8"));
-      writer.jsonGenerator.flush();
-    } catch (TException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
-    }
-  }
-
-  private boolean isPartitionExternal() {
-    Map<String, String> params = partition.getParameters();
-    return params.containsKey("EXTERNAL")
-        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
deleted file mode 100644
index d88a553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
-import java.io.IOException;
-
-public class ReplicationSpecSerializer implements JsonWriter.Serializer {
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) {
-      String value = additionalPropertiesProvider.get(key);
-      if (value != null) {
-        writer.jsonGenerator.writeStringField(key.toString(), value);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
deleted file mode 100644
index a2e258f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class TableSerializer implements JsonWriter.Serializer {
-  private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
-  private final Iterable<Partition> partitions;
-
-  public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle,
-      Iterable<Partition> partitions) {
-    this.tableHandle = tableHandle;
-    this.partitions = partitions;
-  }
-
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    if (cannotReplicateTable(additionalPropertiesProvider)) {
-      return;
-    }
-
-    Table tTable = tableHandle.getTTable();
-    tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
-    try {
-      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-      writer.jsonGenerator
-          .writeStringField("table", serializer.toString(tTable, "UTF-8"));
-      writer.jsonGenerator.writeFieldName("partitions");
-      writePartitions(writer, additionalPropertiesProvider);
-    } catch (TException e) {
-      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
-    }
-  }
-
-  private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvider) {
-    return tableHandle == null || additionalPropertiesProvider.isNoop();
-  }
-
-  private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    if (additionalPropertiesProvider.isInReplicationScope()) {
-      table.putToParameters(
-            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-            additionalPropertiesProvider.getCurrentReplicationState());
-      if (isExternalTable(table)) {
-          // Replication destination will not be external - override if set
-        table.putToParameters("EXTERNAL", "FALSE");
-        }
-      if (isExternalTableType(table)) {
-          // Replication dest will not be external - override if set
-        table.setTableType(TableType.MANAGED_TABLE.toString());
-        }
-    } else {
-      // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
-      // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
-      // TODO: if we want to be explicit about this dump not being a replication dump, we can
-      // uncomment this else section, but currently unnneeded. Will require a lot of golden file
-      // regen if we do so.
-    }
-    return table;
-  }
-
-  private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
-    return table.isSetTableType()
-        && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
-  }
-
-  private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) {
-    Map<String, String> params = table.getParameters();
-    return params.containsKey("EXTERNAL")
-        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
-  }
-
-  private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    writer.jsonGenerator.writeStartArray();
-    if (partitions != null) {
-      for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
-        new PartitionSerializer(partition.getTPartition())
-            .writeTo(writer, additionalPropertiesProvider);
-      }
-    }
-    writer.jsonGenerator.writeEndArray();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
new file mode 100644
index 0000000..846b6f5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class Utils {
+  public static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf)
+      throws SemanticException {
+    DataOutputStream outStream = null;
+    try {
+      FileSystem fs = outputFile.getFileSystem(hiveConf);
+      outStream = fs.create(outputFile);
+      outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+      for (int i = 1; i < values.size(); i++) {
+        outStream.write(Utilities.tabCode);
+        outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
+      }
+      outStream.write(Utilities.newLineCode);
+    } catch (IOException e) {
+      throw new SemanticException(e);
+    } finally {
+      IOUtils.closeStream(outStream);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
deleted file mode 100644
index 3ebc803..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION;
-
-/**
- * This is not used as of now as the conditional which lead to its usage is always false
- * hence we have removed the conditional and the usage of this class, but might be required in future.
- */
-public class VersionCompatibleSerializer implements JsonWriter.Serializer {
-  @Override
-  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
-      throws SemanticException, IOException {
-    writer.jsonGenerator.writeStringField("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
new file mode 100644
index 0000000..15b7e13
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class DBSerializer implements JsonWriter.Serializer {
+  public static final String FIELD_NAME = "db";
+  private final Database dbObject;
+
+  public DBSerializer(Database dbObject) {
+    this.dbObject = dbObject;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    dbObject.putToParameters(
+        ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+        additionalPropertiesProvider.getCurrentReplicationState()
+    );
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      String value = serializer.toString(dbObject, UTF_8);
+      writer.jsonGenerator.writeStringField(FIELD_NAME, value);
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
new file mode 100644
index 0000000..5dc7023
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class FunctionSerializer implements JsonWriter.Serializer {
+  public static final String FIELD_NAME="function";
+  private Function function;
+
+  public FunctionSerializer(Function function) {
+    this.function = function;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      writer.jsonGenerator
+          .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8));
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
new file mode 100644
index 0000000..e20be68
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION;
+
+public class JsonWriter implements Closeable {
+
+  final JsonGenerator jsonGenerator;
+
+  public JsonWriter(FileSystem fs, Path writePath) throws IOException {
+    OutputStream out = fs.create(writePath);
+    jsonGenerator = new JsonFactory().createJsonGenerator(out);
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION);
+  }
+
+  @Override
+  public void close() throws IOException {
+    jsonGenerator.writeEndObject();
+    jsonGenerator.close();
+  }
+
+  public interface Serializer {
+    String UTF_8 = "UTF-8";
+    void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws
+        SemanticException, IOException;
+  }
+}