You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/04 09:49:31 UTC
[1/2] hive git commit: HIVE-16267 : Enable bootstrap function
metadata to be loaded in repl load (Anishek Agarwal,
reviewed by Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master f56abb405 -> 9e9356b5e
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
new file mode 100644
index 0000000..077d39b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class PartitionSerializer implements JsonWriter.Serializer {
+ public static final String FIELD_NAME="partitions";
+ private Partition partition;
+
+ PartitionSerializer(Partition partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ try {
+ if (additionalPropertiesProvider.isInReplicationScope()) {
+ partition.putToParameters(
+ ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ additionalPropertiesProvider.getCurrentReplicationState());
+ if (isPartitionExternal()) {
+ // Replication destination will not be external
+ partition.putToParameters("EXTERNAL", "FALSE");
+ }
+ }
+ writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8));
+ writer.jsonGenerator.flush();
+ } catch (TException e) {
+ throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+ }
+ }
+
+ private boolean isPartitionExternal() {
+ Map<String, String> params = partition.getParameters();
+ return params.containsKey("EXTERNAL")
+ && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
new file mode 100644
index 0000000..3a92e8a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+public class ReplicationSpecSerializer implements JsonWriter.Serializer {
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) {
+ String value = additionalPropertiesProvider.get(key);
+ if (value != null) {
+ writer.jsonGenerator.writeStringField(key.toString(), value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
new file mode 100644
index 0000000..948cb39
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TableSerializer implements JsonWriter.Serializer {
+ public static final String FIELD_NAME = "table";
+ private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
+ private final Iterable<Partition> partitions;
+
+ public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle,
+ Iterable<Partition> partitions) {
+ this.tableHandle = tableHandle;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ if (cannotReplicateTable(additionalPropertiesProvider)) {
+ return;
+ }
+
+ Table tTable = tableHandle.getTTable();
+ tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
+ try {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ writer.jsonGenerator
+ .writeStringField(FIELD_NAME, serializer.toString(tTable, UTF_8));
+ writer.jsonGenerator.writeFieldName(PartitionSerializer.FIELD_NAME);
+ writePartitions(writer, additionalPropertiesProvider);
+ } catch (TException e) {
+ throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+ }
+ }
+
+ private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvider) {
+ return tableHandle == null || additionalPropertiesProvider.isNoop();
+ }
+
+ private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ if (additionalPropertiesProvider.isInReplicationScope()) {
+ table.putToParameters(
+ ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ additionalPropertiesProvider.getCurrentReplicationState());
+ if (isExternalTable(table)) {
+ // Replication destination will not be external - override if set
+ table.putToParameters("EXTERNAL", "FALSE");
+ }
+ if (isExternalTableType(table)) {
+ // Replication dest will not be external - override if set
+ table.setTableType(TableType.MANAGED_TABLE.toString());
+ }
+ } else {
+ // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
+ // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
+ // TODO: if we want to be explicit about this dump not being a replication dump, we can
+ // uncomment this else section, but currently unnneeded. Will require a lot of golden file
+ // regen if we do so.
+ }
+ return table;
+ }
+
+ private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
+ return table.isSetTableType()
+ && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+ }
+
+ private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) {
+ Map<String, String> params = table.getParameters();
+ return params.containsKey("EXTERNAL")
+ && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+ }
+
+ private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ writer.jsonGenerator.writeStartArray();
+ if (partitions != null) {
+ for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
+ new PartitionSerializer(partition.getTPartition())
+ .writeTo(writer, additionalPropertiesProvider);
+ }
+ }
+ writer.jsonGenerator.writeEndArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
new file mode 100644
index 0000000..8201173
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION;
+
+/**
+ * This is not used as of now as the conditional which lead to its usage is always false
+ * hence we have removed the conditional and the usage of this class, but might be required in future.
+ */
+public class VersionCompatibleSerializer implements JsonWriter.Serializer {
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ writer.jsonGenerator.writeStringField("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
index 9a4f8b9..1616ab9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
@@ -35,7 +35,7 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Iterator;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
public class AddPartitionHandler extends AbstractHandler {
protected AddPartitionHandler(NotificationEvent notificationEvent) {
@@ -108,7 +108,7 @@ public class AddPartitionHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_ADD_PARTITION;
+ public DumpType dumpType() {
+ return DumpType.EVENT_ADD_PARTITION;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
index 20d04dc..b6c3496 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
@@ -23,14 +23,14 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class AlterPartitionHandler extends AbstractHandler {
private final org.apache.hadoop.hive.metastore.api.Partition after;
@@ -51,24 +51,24 @@ public class AlterPartitionHandler extends AbstractHandler {
private enum Scenario {
ALTER {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_ALTER_PARTITION;
+ DumpType dumpType() {
+ return DumpType.EVENT_ALTER_PARTITION;
}
},
RENAME {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_RENAME_PARTITION;
+ DumpType dumpType() {
+ return DumpType.EVENT_RENAME_PARTITION;
}
},
TRUNCATE {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_TRUNCATE_PARTITION;
+ DumpType dumpType() {
+ return DumpType.EVENT_TRUNCATE_PARTITION;
}
};
- abstract DUMPTYPE dumpType();
+ abstract DumpType dumpType();
}
private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
@@ -90,14 +90,14 @@ public class AlterPartitionHandler extends AbstractHandler {
if (Scenario.ALTER == scenario) {
withinContext.replicationSpec.setIsMetadataOnly(true);
Table qlMdTable = new Table(tableObject);
- List<Partition> qlPtns = new ArrayList<>();
- qlPtns.add(new Partition(qlMdTable, after));
+ List<Partition> partitions = new ArrayList<>();
+ partitions.add(new Partition(qlMdTable, after));
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
EximUtil.createExportDump(
metaDataPath.getFileSystem(withinContext.hiveConf),
metaDataPath,
qlMdTable,
- qlPtns,
+ partitions,
withinContext.replicationSpec);
}
DumpMetaData dmd = withinContext.createDmd(this);
@@ -106,7 +106,7 @@ public class AlterPartitionHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
+ public DumpType dumpType() {
return scenario.dumpType();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
index bfe0181..d553240 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
@@ -22,13 +22,12 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class AlterTableHandler extends AbstractHandler {
- private final org.apache.hadoop.hive.metastore.api.Table before;
private final org.apache.hadoop.hive.metastore.api.Table after;
private final boolean isTruncateOp;
private final Scenario scenario;
@@ -36,30 +35,30 @@ public class AlterTableHandler extends AbstractHandler {
private enum Scenario {
ALTER {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_ALTER_TABLE;
+ DumpType dumpType() {
+ return DumpType.EVENT_ALTER_TABLE;
}
},
RENAME {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_RENAME_TABLE;
+ DumpType dumpType() {
+ return DumpType.EVENT_RENAME_TABLE;
}
},
TRUNCATE {
@Override
- DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_TRUNCATE_TABLE;
+ DumpType dumpType() {
+ return DumpType.EVENT_TRUNCATE_TABLE;
}
};
- abstract DUMPTYPE dumpType();
+ abstract DumpType dumpType();
}
AlterTableHandler(NotificationEvent event) throws Exception {
super(event);
AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage());
- before = atm.getTableObjBefore();
+ org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore();
after = atm.getTableObjAfter();
isTruncateOp = atm.getIsTruncateOp();
scenario = scenarioType(before, after);
@@ -97,7 +96,7 @@ public class AlterTableHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
+ public DumpType dumpType() {
return scenario.dumpType();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
index 03f400d..88600fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
@@ -28,7 +28,7 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
public class CreateTableHandler extends AbstractHandler {
@@ -80,7 +80,7 @@ public class CreateTableHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_CREATE_TABLE;
+ public DumpType dumpType() {
+ return DumpType.EVENT_CREATE_TABLE;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
index 61c5f37..78cd74f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class DefaultHandler extends AbstractHandler {
@@ -37,7 +38,7 @@ public class DefaultHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_UNKNOWN;
+ public DumpType dumpType() {
+ return DumpType.EVENT_UNKNOWN;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
index 3ad794e..c4a0908 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class DropPartitionHandler extends AbstractHandler {
@@ -37,7 +38,7 @@ public class DropPartitionHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_DROP_PARTITION;
+ public DumpType dumpType() {
+ return DumpType.EVENT_DROP_PARTITION;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
index cae379b..e3addaf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.parse.repl.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class DropTableHandler extends AbstractHandler {
@@ -37,7 +38,7 @@ public class DropTableHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_DROP_TABLE;
+ public DumpType dumpType() {
+ return DumpType.EVENT_DROP_TABLE;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
index 199145a..29f3b42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
public interface EventHandler {
void handle(Context withinContext) throws Exception;
@@ -32,7 +32,7 @@ public interface EventHandler {
long toEventId();
- DUMPTYPE dumpType();
+ DumpType dumpType();
class Context {
final Path eventRoot, cmRoot;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
index e9f2a6a..910b396 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
@@ -32,8 +32,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
public class InsertHandler extends AbstractHandler {
@@ -103,7 +104,7 @@ public class InsertHandler extends AbstractHandler {
}
@Override
- public DUMPTYPE dumpType() {
- return DUMPTYPE.EVENT_INSERT;
+ public DumpType dumpType() {
+ return DumpType.EVENT_INSERT;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
new file mode 100644
index 0000000..12ad19b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+
+public class DumpMetaData {
+ // wrapper class for reading and writing metadata about a dump
+ // responsible for _dumpmetadata files
+ private static final String DUMP_METADATA = "_dumpmetadata";
+
+ private DumpType dumpType;
+ private Long eventFrom = null;
+ private Long eventTo = null;
+ private String payload = null;
+ private boolean initialized = false;
+
+ private final Path dumpFile;
+ private final HiveConf hiveConf;
+ private Path cmRoot;
+
+ public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ dumpFile = new Path(dumpRoot, DUMP_METADATA);
+ }
+
+ public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot,
+ HiveConf hiveConf) {
+ this(dumpRoot, hiveConf);
+ setDump(lvl, eventFrom, eventTo, cmRoot);
+ }
+
+ public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) {
+ this.dumpType = lvl;
+ this.eventFrom = eventFrom;
+ this.eventTo = eventTo;
+ this.initialized = true;
+ this.cmRoot = cmRoot;
+ }
+
+ private void loadDumpFromFile() throws SemanticException {
+ try {
+ // read from dumpfile and instantiate self
+ FileSystem fs = dumpFile.getFileSystem(hiveConf);
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+ String line = null;
+ if ((line = br.readLine()) != null) {
+ String[] lineContents = line.split("\t", 5);
+ setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]),
+ Long.valueOf(lineContents[2]),
+ new Path(lineContents[3]));
+ setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
+ ReplChangeManager.setCmRoot(cmRoot);
+ } else {
+ throw new IOException(
+ "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString());
+ }
+ } catch (IOException ioe) {
+ throw new SemanticException(ioe);
+ }
+ }
+
+ public DumpType getDumpType() throws SemanticException {
+ initializeIfNot();
+ return this.dumpType;
+ }
+
+ public String getPayload() throws SemanticException {
+ initializeIfNot();
+ return this.payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+ public Long getEventFrom() throws SemanticException {
+ initializeIfNot();
+ return eventFrom;
+ }
+
+ public Long getEventTo() throws SemanticException {
+ initializeIfNot();
+ return eventTo;
+ }
+
+ public Path getDumpFilePath() {
+ return dumpFile;
+ }
+
+ public boolean isIncrementalDump() throws SemanticException {
+ initializeIfNot();
+ return (this.dumpType == DumpType.INCREMENTAL);
+ }
+
+ private void initializeIfNot() throws SemanticException {
+ if (!initialized) {
+ loadDumpFromFile();
+ }
+ }
+
+
+ public void write() throws SemanticException {
+ Utils.writeOutput(
+ Arrays.asList(
+ dumpType.toString(),
+ eventFrom.toString(),
+ eventTo.toString(),
+ cmRoot.toString(),
+ payload),
+ dumpFile,
+ hiveConf
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
new file mode 100644
index 0000000..fc02dfd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+/**
+ * Utility class to help return complex value from readMetaData function
+ */
+public class MetaData {
+ private final Database db;
+ private final Table table;
+ private final Iterable<Partition> partitions;
+ private final ReplicationSpec replicationSpec;
+ public final Function function;
+
+ public MetaData() {
+ this(null, null, null, new ReplicationSpec(), null);
+ }
+
+ MetaData(Database db, Table table, Iterable<Partition> partitions,
+ ReplicationSpec replicationSpec, Function function) {
+ this.db = db;
+ this.table = table;
+ this.partitions = partitions;
+ this.replicationSpec = replicationSpec;
+ this.function = function;
+ }
+
+ public Database getDatabase() {
+ return db;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public Iterable<Partition> getPartitions() {
+ return partitions;
+ }
+
+ public ReplicationSpec getReplicationSpec() {
+ return replicationSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
new file mode 100644
index 0000000..b7a5680
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.PartitionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter.Serializer.UTF_8;
+
+public class MetadataJson {
+ private final JSONObject json;
+ private final TDeserializer deserializer;
+ private final String tableDesc;
+
+ public MetadataJson(String message) throws JSONException, SemanticException {
+ deserializer = new TDeserializer(new TJSONProtocol.Factory());
+ json = new JSONObject(message);
+ checkCompatibility();
+ tableDesc = jsonEntry(TableSerializer.FIELD_NAME);
+ }
+
+ public MetaData getMetaData() throws TException, JSONException {
+ return new MetaData(
+ database(),
+ table(),
+ partitions(),
+ readReplicationSpec(),
+ function()
+ );
+ }
+
+ private Function function() throws TException {
+ return deserialize(new Function(), jsonEntry(FunctionSerializer.FIELD_NAME));
+ }
+
+ private Database database() throws TException {
+ return deserialize(new Database(), jsonEntry(DBSerializer.FIELD_NAME));
+ }
+
+ private Table table() throws TException {
+ return deserialize(new Table(), tableDesc);
+ }
+
+ private <T extends TBase> T deserialize(T intoObject, String json) throws TException {
+ if (json == null) {
+ return null;
+ }
+ deserializer.deserialize(intoObject, json, UTF_8);
+ return intoObject;
+ }
+
+ private List<Partition> partitions() throws JSONException, TException {
+ if (tableDesc == null) {
+ return null;
+ }
+ // TODO : jackson-streaming-iterable-redo this
+ JSONArray jsonPartitions = new JSONArray(json.getString(PartitionSerializer.FIELD_NAME));
+ List<Partition> partitionsList = new ArrayList<>(jsonPartitions.length());
+ for (int i = 0; i < jsonPartitions.length(); ++i) {
+ String partDesc = jsonPartitions.getString(i);
+ partitionsList.add(deserialize(new Partition(), partDesc));
+ }
+ return partitionsList;
+ }
+
+ private ReplicationSpec readReplicationSpec() {
+ com.google.common.base.Function<String, String> keyFetcher =
+ new com.google.common.base.Function<String, String>() {
+ @Override
+ public String apply(@Nullable String s) {
+ return jsonEntry(s);
+ }
+ };
+ return new ReplicationSpec(keyFetcher);
+ }
+
+ private void checkCompatibility() throws SemanticException, JSONException {
+ String version = json.getString("version");
+ String fcVersion = jsonEntry("fcversion");
+ EximUtil.doCheckCompatibility(
+ EximUtil.METADATA_FORMAT_VERSION,
+ version,
+ fcVersion);
+ }
+
+ private String jsonEntry(String forName) {
+ try {
+ return json.getString(forName);
+ } catch (JSONException ignored) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
new file mode 100644
index 0000000..3028e76
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HiveWrapperTest {
+ @Mock
+ private HiveWrapper.Tuple.Function<ReplicationSpec> specFunction;
+ @Mock
+ private HiveWrapper.Tuple.Function<Table> tableFunction;
+
+ @Test
+ public void replicationIdIsRequestedBeforeObjectDefinition() throws HiveException {
+ new HiveWrapper.Tuple<>(specFunction, tableFunction);
+ InOrder inOrder = Mockito.inOrder(specFunction, tableFunction);
+ inOrder.verify(specFunction).fromMetaStore();
+ inOrder.verify(tableFunction).fromMetaStore();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
index d44cb79..4b802c4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.ql.parse.repl.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
@@ -44,7 +44,7 @@ public class TestEventHandlerFactory {
}
@Override
- public ReplicationSemanticAnalyzer.DUMPTYPE dumpType() {
+ public DumpType dumpType() {
return null;
}
}
[2/2] hive git commit: HIVE-16267 : Enable bootstrap function
metadata to be loaded in repl load (Anishek Agarwal,
reviewed by Sushanth Sowmyan)
Posted by kh...@apache.org.
HIVE-16267 : Enable bootstrap function metadata to be loaded in repl load (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e9356b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e9356b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e9356b5
Branch: refs/heads/master
Commit: 9e9356b5e2cd03ff327ac1b269983454118e5f8e
Parents: f56abb4
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Thu May 4 01:37:59 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Thu May 4 02:49:27 2017 -0700
----------------------------------------------------------------------
.../hive/ql/TestReplicationScenarios.java | 187 ++++++------
.../apache/hadoop/hive/ql/metadata/Hive.java | 1 -
.../apache/hadoop/hive/ql/parse/EximUtil.java | 157 ++--------
.../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 305 ++++++-------------
.../hadoop/hive/ql/parse/repl/DumpType.java | 45 +++
.../dump/BootStrapReplicationSpecFunction.java | 54 ++++
.../hive/ql/parse/repl/dump/DBSerializer.java | 54 ----
.../ql/parse/repl/dump/FunctionSerializer.java | 48 ---
.../hive/ql/parse/repl/dump/HiveWrapper.java | 73 +++++
.../hive/ql/parse/repl/dump/JsonWriter.java | 54 ----
.../ql/parse/repl/dump/PartitionSerializer.java | 64 ----
.../repl/dump/ReplicationSpecSerializer.java | 36 ---
.../ql/parse/repl/dump/TableSerializer.java | 113 -------
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 50 +++
.../repl/dump/VersionCompatibleSerializer.java | 37 ---
.../ql/parse/repl/dump/io/DBSerializer.java | 55 ++++
.../parse/repl/dump/io/FunctionSerializer.java | 49 +++
.../hive/ql/parse/repl/dump/io/JsonWriter.java | 55 ++++
.../parse/repl/dump/io/PartitionSerializer.java | 65 ++++
.../repl/dump/io/ReplicationSpecSerializer.java | 36 +++
.../ql/parse/repl/dump/io/TableSerializer.java | 114 +++++++
.../dump/io/VersionCompatibleSerializer.java | 37 +++
.../parse/repl/events/AddPartitionHandler.java | 6 +-
.../repl/events/AlterPartitionHandler.java | 28 +-
.../ql/parse/repl/events/AlterTableHandler.java | 25 +-
.../parse/repl/events/CreateTableHandler.java | 6 +-
.../ql/parse/repl/events/DefaultHandler.java | 9 +-
.../parse/repl/events/DropPartitionHandler.java | 9 +-
.../ql/parse/repl/events/DropTableHandler.java | 9 +-
.../hive/ql/parse/repl/events/EventHandler.java | 6 +-
.../ql/parse/repl/events/InsertHandler.java | 9 +-
.../hive/ql/parse/repl/load/DumpMetaData.java | 143 +++++++++
.../hive/ql/parse/repl/load/MetaData.java | 64 ++++
.../hive/ql/parse/repl/load/MetadataJson.java | 128 ++++++++
.../ql/parse/repl/dump/HiveWrapperTest.java | 27 ++
.../repl/events/TestEventHandlerFactory.java | 4 +-
37 files changed, 1273 insertions(+), 894 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index b3cbae0..5173d8b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -42,7 +42,9 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,17 +64,22 @@ import static org.junit.Assert.assertNull;
public class TestReplicationScenarios {
- final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener";
+ @Rule
+ public final TestName testName = new TestName();
+
+ private final static String DBNOTIF_LISTENER_CLASSNAME =
+ "org.apache.hive.hcatalog.listener.DbNotificationListener";
// FIXME : replace with hive copy once that is copied
- final static String tid =
+ private final static String tid =
TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
- final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid;
+ private final static String TEST_PATH =
+ System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
- static HiveConf hconf;
- static boolean useExternalMS = false;
- static int msPort;
- static Driver driver;
- static HiveMetaStoreClient metaStoreClient;
+ private static HiveConf hconf;
+ private static boolean useExternalMS = false;
+ private static int msPort;
+ private static Driver driver;
+ private static HiveMetaStoreClient metaStoreClient;
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private ArrayList<String> lastResults;
@@ -141,6 +148,32 @@ public class TestReplicationScenarios {
ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
}
+ @Test
+ public void testFunctionReplicationAsPartOfBootstrap() throws IOException {
+ String dbName = createDB(testName.getMethodName());
+ run("CREATE FUNCTION " + dbName
+ + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+ + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+
+ String replicatedDbName = loadAndVerify(dbName);
+ run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'");
+ verifyResults(new String[] { replicatedDbName + ".testFunction" });
+ }
+
+ private String loadAndVerify(String dbName) throws IOException {
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String dumpLocation = getResult(0, 0);
+ String lastReplicationId = getResult(0, 1, true);
+ String replicatedDbName = dbName + "_replicated";
+ run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ printOutput();
+ run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+ return replicatedDbName;
+ }
+
+
/**
* Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
* Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -149,12 +182,8 @@ public class TestReplicationScenarios {
*/
@Test
public void testBasic() throws IOException {
-
- String testName = "basic";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -165,9 +194,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -182,31 +211,19 @@ public class TestReplicationScenarios {
verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
- advanceDumpDir();
- run("REPL DUMP " + dbName);
- String replDumpLocn = getResult(0,0);
- String replDumpId = getResult(0,1,true);
- run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
- printOutput();
- run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ String replicatedDbName = loadAndVerify(dbName);
- verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId);
-
- verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
- verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+ verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
}
@Test
public void testBasicWithCM() throws Exception {
-
- String testName = "basic_with_cm";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -218,10 +235,10 @@ public class TestReplicationScenarios {
String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
- String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+ String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -334,11 +351,8 @@ public class TestReplicationScenarios {
@Test
public void testIncrementalAdds() throws IOException {
- String testName = "incrementalAdds";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -357,9 +371,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -421,11 +435,8 @@ public class TestReplicationScenarios {
@Test
public void testDrops() throws IOException {
- String testName = "drops";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -436,9 +447,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -535,10 +546,7 @@ public class TestReplicationScenarios {
public void testDropsWithCM() throws IOException {
String testName = "drops_with_cm";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -661,10 +669,7 @@ public class TestReplicationScenarios {
public void testAlters() throws IOException {
String testName = "alters";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -846,10 +851,7 @@ public class TestReplicationScenarios {
@Test
public void testIncrementalLoad() throws IOException {
String testName = "incrementalLoad";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -934,10 +936,7 @@ public class TestReplicationScenarios {
@Test
public void testIncrementalInserts() throws IOException {
String testName = "incrementalInserts";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
advanceDumpDir();
@@ -1062,10 +1061,7 @@ public class TestReplicationScenarios {
@Test
public void testViewsReplication() throws IOException {
String testName = "viewsReplication";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -1142,11 +1138,8 @@ public class TestReplicationScenarios {
@Test
public void testDumpLimit() throws IOException {
- String testName = "dumpLimit";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
advanceDumpDir();
@@ -1530,11 +1523,8 @@ public class TestReplicationScenarios {
// Now, to actually testing status - first, we bootstrap.
- String testName = "incrementalStatus";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
advanceDumpDir();
run("REPL DUMP " + dbName);
String lastReplDumpLocn = getResult(0, 0);
@@ -1589,6 +1579,13 @@ public class TestReplicationScenarios {
}
+ private static String createDB(String name) {
+ LOG.info("Testing " + name);
+ String dbName = name + "_" + tid;
+ run("CREATE DATABASE " + dbName);
+ return dbName;
+ }
+
@Test
public void testEventFilters(){
// Test testing that the filters introduced by EventUtils are working correctly.
@@ -1749,18 +1746,25 @@ public class TestReplicationScenarios {
return (lastResults.get(rowNum).split("\\t"))[colNum];
}
+ /**
+ * All the results that are read from the hive output will not preserve
+ * case sensitivity and will all be in lower case, hence we will check against
+ * only lower case data values.
+ * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+ * before assert.
+ */
private void verifyResults(String[] data) throws IOException {
List<String> results = getOutput();
- LOG.info("Expecting {}",data);
- LOG.info("Got {}",results);
- assertEquals(data.length,results.size());
- for (int i = 0; i < data.length; i++){
- assertEquals(data[i],results.get(i));
+ LOG.info("Expecting {}", data);
+ LOG.info("Got {}", results);
+ assertEquals(data.length, results.size());
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
}
}
private List<String> getOutput() throws IOException {
- List<String> results = new ArrayList<String>();
+ List<String> results = new ArrayList<>();
try {
driver.getResults(results);
} catch (CommandNeedRetryException e) {
@@ -1848,5 +1852,4 @@ public class TestReplicationScenarios {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5b49dfd..5b908e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1437,7 +1437,6 @@ public class Hive {
*/
public List<String> getTablesByType(String dbName, String pattern, TableType type)
throws HiveException {
- List<String> retList = new ArrayList<String>();
if (dbName == null)
dbName = SessionState.get().getCurrentDatabase();
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 1ea5182..a9384be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -18,43 +18,36 @@
package org.apache.hadoop.hive.ql.parse;
-import com.google.common.base.Function;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
-import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer;
-import org.apache.thrift.TDeserializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.json.JSONArray;
import org.json.JSONException;
-import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -270,124 +263,30 @@ public class EximUtil {
}
}
- /**
- * Utility class to help return complex value from readMetaData function
- */
- public static class ReadMetaData {
- private final Database db;
- private final Table table;
- private final Iterable<Partition> partitions;
- private final ReplicationSpec replicationSpec;
-
- public ReadMetaData(){
- this(null,null,null,new ReplicationSpec());
- }
- public ReadMetaData(Database db, Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
- this.db = db;
- this.table = table;
- this.partitions = partitions;
- this.replicationSpec = replicationSpec;
- }
-
- public Database getDatabase(){
- return db;
- }
-
- public Table getTable() {
- return table;
- }
-
- public Iterable<Partition> getPartitions() {
- return partitions;
- }
-
- public ReplicationSpec getReplicationSpec() {
- return replicationSpec;
- }
- };
-
- public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath)
+ static MetaData readMetaData(FileSystem fs, Path metadataPath)
throws IOException, SemanticException {
- FSDataInputStream mdstream = null;
+ String message = readAsString(fs, metadataPath);
try {
- mdstream = fs.open(metadataPath);
+ return new MetadataJson(message).getMetaData();
+ } catch (TException | JSONException e) {
+ throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
+ }
+ }
+
+ private static String readAsString(final FileSystem fs, final Path fromMetadataPath)
+ throws IOException {
+ try (FSDataInputStream stream = fs.open(fromMetadataPath)) {
byte[] buffer = new byte[1024];
ByteArrayOutputStream sb = new ByteArrayOutputStream();
- int read = mdstream.read(buffer);
+ int read = stream.read(buffer);
while (read != -1) {
sb.write(buffer, 0, read);
- read = mdstream.read(buffer);
- }
- String md = new String(sb.toByteArray(), "UTF-8");
- JSONObject jsonContainer = new JSONObject(md);
- String version = jsonContainer.getString("version");
- String fcversion = getJSONStringEntry(jsonContainer, "fcversion");
- checkCompatibility(version, fcversion);
-
- String dbDesc = getJSONStringEntry(jsonContainer, "db");
- String tableDesc = getJSONStringEntry(jsonContainer,"table");
- TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
-
- Database db = null;
- if (dbDesc != null){
- db = new Database();
- deserializer.deserialize(db, dbDesc, "UTF-8");
- }
-
- Table table = null;
- List<Partition> partitionsList = null;
- if (tableDesc != null){
- table = new Table();
- deserializer.deserialize(table, tableDesc, "UTF-8");
- // TODO : jackson-streaming-iterable-redo this
- JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions"));
- partitionsList = new ArrayList<Partition>(jsonPartitions.length());
- for (int i = 0; i < jsonPartitions.length(); ++i) {
- String partDesc = jsonPartitions.getString(i);
- Partition partition = new Partition();
- deserializer.deserialize(partition, partDesc, "UTF-8");
- partitionsList.add(partition);
- }
- }
-
- return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer));
- } catch (JSONException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
- } catch (TException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
- } finally {
- if (mdstream != null) {
- mdstream.close();
+ read = stream.read(buffer);
}
+ return new String(sb.toByteArray(), "UTF-8");
}
}
- private static ReplicationSpec readReplicationSpec(final JSONObject jsonContainer){
- Function<String,String> keyFetcher = new Function<String, String>() {
- @Override
- public String apply(@Nullable String s) {
- return getJSONStringEntry(jsonContainer,s);
- }
- };
- return new ReplicationSpec(keyFetcher);
- }
-
- private static String getJSONStringEntry(JSONObject jsonContainer, String name) {
- String retval = null;
- try {
- retval = jsonContainer.getString(name);
- } catch (JSONException ignored) {}
- return retval;
- }
-
- /* check the forward and backward compatibility */
- private static void checkCompatibility(String version, String fcVersion) throws SemanticException {
- doCheckCompatibility(
- METADATA_FORMAT_VERSION,
- version,
- fcVersion);
- }
-
/* check the forward and backward compatibility */
public static void doCheckCompatibility(String currVersion,
String version, String fcVersion) throws SemanticException {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 71d6074..dc86942 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -38,7 +37,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
@@ -60,6 +58,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -186,7 +185,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
FileSystem fs = FileSystem.get(fromURI, x.getConf());
x.getInputs().add(toReadEntity(fromPath, x.getConf()));
- EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ MetaData rv = new MetaData();
try {
rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 37aa3ba..2daa123 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,77 +17,70 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
import com.google.common.primitives.Ints;
import org.antlr.runtime.tree.Tree;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Predicate;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler;
import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
@@ -120,154 +113,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
- public static final String DUMPMETADATA = "_dumpmetadata";
-
- public enum DUMPTYPE {
- BOOTSTRAP("BOOTSTRAP"),
- INCREMENTAL("INCREMENTAL"),
- EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
- EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
- EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
- EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
- EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"),
- EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
- EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"),
- EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
- EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
- EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
- EVENT_INSERT("EVENT_INSERT"),
- EVENT_UNKNOWN("EVENT_UNKNOWN");
-
- String type = null;
- DUMPTYPE(String type) {
- this.type = type;
- }
-
- @Override
- public String toString(){
- return type;
- }
-
- };
-
- public static class DumpMetaData {
- // wrapper class for reading and writing metadata about a dump
- // responsible for _dumpmetadata files
-
- private DUMPTYPE dumpType;
- private Long eventFrom = null;
- private Long eventTo = null;
- private String payload = null;
- private boolean initialized = false;
-
- private final Path dumpRoot;
- private final Path dumpFile;
- private final HiveConf hiveConf;
- private Path cmRoot;
-
- public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
- this.dumpRoot = dumpRoot;
- this.hiveConf = hiveConf;
- dumpFile = new Path(dumpRoot, DUMPMETADATA);
- }
-
- public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot,
- HiveConf hiveConf){
- this(dumpRoot,hiveConf);
- setDump(lvl, eventFrom, eventTo, cmRoot);
- }
-
- public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
- this.dumpType = lvl;
- this.eventFrom = eventFrom;
- this.eventTo = eventTo;
- this.initialized = true;
- this.cmRoot = cmRoot;
- }
-
- public void loadDumpFromFile() throws SemanticException {
- try {
- // read from dumpfile and instantiate self
- FileSystem fs = dumpFile.getFileSystem(hiveConf);
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
- String line = null;
- if ( (line = br.readLine()) != null){
- String[] lineContents = line.split("\t", 5);
- setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]),
- new Path(lineContents[3]));
- setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
- ReplChangeManager.setCmRoot(cmRoot);
- } else {
- throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
- }
- } catch (IOException ioe){
- throw new SemanticException(ioe);
- }
- }
-
- public DUMPTYPE getDumpType() throws SemanticException {
- initializeIfNot();
- return this.dumpType;
- }
-
- public String getPayload() throws SemanticException {
- initializeIfNot();
- return this.payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public Long getEventFrom() throws SemanticException {
- initializeIfNot();
- return eventFrom;
- }
-
- public Long getEventTo() throws SemanticException {
- initializeIfNot();
- return eventTo;
- }
-
- public Path getCmRoot() {
- return cmRoot;
- }
-
- public void setCmRoot(Path cmRoot) {
- this.cmRoot = cmRoot;
- }
-
- public Path getDumpFilePath() {
- return dumpFile;
- }
-
- public boolean isIncrementalDump() throws SemanticException {
- initializeIfNot();
- return (this.dumpType == DUMPTYPE.INCREMENTAL);
- }
-
- private void initializeIfNot() throws SemanticException {
- if (!initialized){
- loadDumpFromFile();
- }
- }
-
- public void write() throws SemanticException {
- writeOutput(
- Arrays.asList(
- dumpType.toString(),
- eventFrom.toString(),
- eventTo.toString(),
- cmRoot.toString(),
- payload),
- dumpFile,
- hiveConf
- );
- }
- }
-
- public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@@ -387,7 +234,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info(
"Consolidation done, preparing to return {},{}->{}",
dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
- dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
+ dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
dmd.write();
// Set the correct last repl id to return to the user
@@ -433,10 +280,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
- writeOutput(
- Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(lastReplId)),
+ Utils.writeOutput(
+ Arrays.asList(
+ "incremental",
+ String.valueOf(eventFrom),
+ String.valueOf(lastReplId)
+ ),
dmd.getDumpFilePath(), conf);
- dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, lastReplId, cmRoot);
+ dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot);
dmd.write();
}
prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
@@ -463,7 +314,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
testInjectDumpDir = dumpdir;
}
- String getNextDumpDir() {
+ private String getNextDumpDir() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
// make it easy to write .q unit tests, instead of unique id generation.
// however, this does mean that in writing tests, we have to be aware that
@@ -494,8 +345,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// TODO : instantiating FS objects are generally costly. Refactor
FileSystem fs = dbRoot.getFileSystem(conf);
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
- Database dbObj = db.getDatabase(dbName);
- EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+ HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
+ EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e);
@@ -513,9 +364,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// TODO : This should ideally return the Function Objects and not Strings(function names) that should be done by the caller, Look at this separately.
List<String> functionNames = db.getFunctions(dbName, "*");
for (String functionName : functionNames) {
- org.apache.hadoop.hive.metastore.api.Function function =
- db.getFunction(dbName, functionName);
- if (function.getResourceUris().isEmpty()) {
+ HiveWrapper.Tuple<Function> tuple;
+ try {
+ tuple = new HiveWrapper(db, dbName).function(functionName);
+ } catch (HiveException e) {
+ //This can happen as we are querying the getFunctions before we are getting the actual function
+ //in between there can be a drop function by a user in which case our call will fail.
+ LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e);
+ continue;
+ }
+ if (tuple.object.getResourceUris().isEmpty()) {
SESSION_STATE_LOG.warn(
"Not replicating function: " + functionName + " as it seems to have been created "
+ "without USING clause");
@@ -526,7 +384,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME);
try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf),
functionMetadataRoot)) {
- new FunctionSerializer(function).writeTo(jsonWriter, getNewReplicationSpec());
+ new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec);
}
}
} catch (Exception e) {
@@ -738,7 +596,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
taskChainTail = barrierTask;
evstage++;
- lastEvid = dmd.eventTo;
+ lastEvid = dmd.getEventTo();
}
}
@@ -1121,7 +979,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// associated with that
// Then, we iterate over all subdirs, and create table imports for each.
- EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ MetaData rv = new MetaData();
try {
rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
} catch (IOException e) {
@@ -1163,15 +1021,67 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
rootTasks.add(dbRootTask);
FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
- for (FileStatus tableDir : dirsInDbPath) {
+ for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) {
analyzeTableLoad(
dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null);
}
+
+ //Function load
+ Path functionMetaDataRoot = new Path(dir.getPath(), FUNCTIONS_ROOT_DIR_NAME);
+ if (fs.exists(functionMetaDataRoot)) {
+ List<FileStatus> functionDirectories =
+ Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs)));
+ for (FileStatus functionDir : functionDirectories) {
+ analyzeFunctionLoad(dbName, functionDir, dbRootTask);
+ }
+ }
} catch (Exception e) {
throw new SemanticException(e);
}
}
+ private static class TableDirPredicate implements Predicate<FileStatus> {
+ @Override
+ public boolean apply(FileStatus fileStatus) {
+ return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME);
+ }
+ }
+
+ private void analyzeFunctionLoad(String dbName, FileStatus functionDir,
+ Task<? extends Serializable> createDbTask) throws IOException, SemanticException {
+ URI fromURI = EximUtil
+ .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString()));
+ Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+
+ FileSystem fs = FileSystem.get(fromURI, conf);
+ inputs.add(toReadEntity(fromPath, conf));
+
+ try {
+ MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ ReplicationSpec replicationSpec = metaData.getReplicationSpec();
+ if (replicationSpec.isNoop()) {
+ // nothing to do here, silently return.
+ return;
+ }
+ CreateFunctionDesc desc = new CreateFunctionDesc(
+ dbName + "." + metaData.function.getFunctionName(),
+ false,
+ metaData.function.getClassName(),
+ metaData.function.getResourceUris()
+ );
+
+ Task<FunctionWork> currentTask = TaskFactory.get(new FunctionWork(desc), conf);
+ if (createDbTask != null) {
+ createDbTask.addDependentTask(currentTask);
+ LOG.debug("Added {}:{} as a precursor of {}:{}",
+ createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(),
+ currentTask.getId());
+ }
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+ }
+
private List<Task<? extends Serializable>> analyzeTableLoad(
String dbName, String tblName, String locn,
Task<? extends Serializable> precursor,
@@ -1270,27 +1180,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.debug(" > " + s);
}
ctx.setResFile(ctx.getLocalTmpPath());
- writeOutput(values, ctx.getResFile(), conf);
- }
-
- private static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf)
- throws SemanticException {
- FileSystem fs = null;
- DataOutputStream outStream = null;
- try {
- fs = outputFile.getFileSystem(hiveConf);
- outStream = fs.create(outputFile);
- outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
- for (int i = 1; i < values.size(); i++) {
- outStream.write(Utilities.tabCode);
- outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
- }
- outStream.write(Utilities.newLineCode);
- } catch (IOException e) {
- throw new SemanticException(e);
- } finally {
- IOUtils.closeStream(outStream);
- }
+ Utils.writeOutput(values, ctx.getResFile(), conf);
}
private ReplicationSpec getNewReplicationSpec() throws SemanticException {
@@ -1327,14 +1217,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
static Iterable<String> removeValuesTemporaryTables(List<String> tableNames) {
- List<String> allTables = new ArrayList<>(tableNames);
- CollectionUtils.filter(allTables, new Predicate() {
- @Override
- public boolean evaluate(Object tableName) {
- return !tableName.toString().toLowerCase().startsWith(TMP_TABLE_PREFIX);
- }
- });
- return allTables;
+ return Collections2.filter(tableNames,
+ tableName -> {
+ assert tableName != null;
+ return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
+ });
}
private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
new file mode 100644
index 0000000..b1df5a3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl;
+
+public enum DumpType {
+ BOOTSTRAP("BOOTSTRAP"),
+ INCREMENTAL("INCREMENTAL"),
+ EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
+ EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
+ EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
+ EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
+ EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"),
+ EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
+ EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"),
+ EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
+ EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
+ EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
+ EVENT_INSERT("EVENT_INSERT"),
+ EVENT_UNKNOWN("EVENT_UNKNOWN");
+
+ String type = null;
+ DumpType(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString(){
+ return type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
new file mode 100644
index 0000000..ae37c73
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<ReplicationSpec> {
+ private final Hive db;
+
+ BootStrapReplicationSpecFunction(Hive db) {
+ this.db = db;
+ }
+
+ @Override
+ public ReplicationSpec fromMetaStore() throws HiveException {
+ try {
+ ReplicationSpec replicationSpec =
+ new ReplicationSpec(
+ true,
+ false,
+ "replv2",
+ "will-be-set",
+ false,
+ true,
+ false
+ );
+ long currentNotificationId = db.getMSC()
+ .getCurrentNotificationEventId().getEventId();
+ replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
+ return replicationSpec;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
deleted file mode 100644
index 40770de..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-
-public class DBSerializer implements JsonWriter.Serializer {
- private final Database dbObject;
-
- public DBSerializer(Database dbObject) {
- this.dbObject = dbObject;
- }
-
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- dbObject.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
- additionalPropertiesProvider.getCurrentReplicationState()
- );
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- try {
- String value = serializer.toString(dbObject, "UTF-8");
- writer.jsonGenerator.writeStringField("db", value);
- } catch (TException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
deleted file mode 100644
index 6b03766..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-
-public class FunctionSerializer implements JsonWriter.Serializer {
- private Function function;
-
- public FunctionSerializer(Function function) {
- this.function = function;
- }
-
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- try {
- writer.jsonGenerator
- .writeStringField("function", serializer.toString(function, "UTF-8"));
- } catch (TException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
new file mode 100644
index 0000000..1dcaec2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+/**
+ * The idea for this class is that since we need to make sure that
+ * we query the replication id from the db before we do any queries
+ * to get the object from metastore like tables/functions/partitions etc
+ * we are devising this wrapper to wrap all such ordering of statements here.
+ */
+
+public class HiveWrapper {
+ private final Hive db;
+ private final String dbName;
+ private final BootStrapReplicationSpecFunction functionForSpec;
+
+ public HiveWrapper(Hive db, String dbName) {
+ this.dbName = dbName;
+ this.db = db;
+ this.functionForSpec = new BootStrapReplicationSpecFunction(db);
+ }
+
+ public Tuple<org.apache.hadoop.hive.metastore.api.Function> function(final String name)
+ throws HiveException {
+ return new Tuple<>(functionForSpec, () -> db.getFunction(dbName, name));
+ }
+
+ public Tuple<Database> database() throws HiveException {
+ return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName));
+ }
+
+ public static class Tuple<T> {
+
+ interface Function<T> {
+ T fromMetaStore() throws HiveException;
+ }
+
+ public final ReplicationSpec replicationSpec;
+ public final T object;
+
+ /**
+ * we have to get the replicationspec before we query for the function object
+ * from the hive metastore as the spec creation captures the latest event id for replication
+ * and we dont want to miss any events hence we are ok replaying some events as part of
+ * incremental load to achieve a consistent state of the warehouse.
+ */
+ Tuple(Function<ReplicationSpec> replicationSpecFunction,
+ Function<T> functionForObject) throws HiveException {
+ this.replicationSpec = replicationSpecFunction.fromMetaStore();
+ this.object = functionForObject.fromMetaStore();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
deleted file mode 100644
index 1aa1195..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION;
-
-public class JsonWriter implements Closeable {
-
- final JsonGenerator jsonGenerator;
-
- public JsonWriter(FileSystem fs, Path writePath) throws IOException {
- OutputStream out = fs.create(writePath);
- jsonGenerator = new JsonFactory().createJsonGenerator(out);
- jsonGenerator.writeStartObject();
- jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION);
- }
-
- @Override
- public void close() throws IOException {
- jsonGenerator.writeEndObject();
- jsonGenerator.close();
- }
-
- public interface Serializer {
- void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws
- SemanticException, IOException;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
deleted file mode 100644
index 313d108..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-import java.util.Map;
-
-class PartitionSerializer implements JsonWriter.Serializer {
- private Partition partition;
-
- PartitionSerializer(Partition partition) {
- this.partition = partition;
- }
-
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- try {
- if (additionalPropertiesProvider.isInReplicationScope()) {
- partition.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
- additionalPropertiesProvider.getCurrentReplicationState());
- if (isPartitionExternal()) {
- // Replication destination will not be external
- partition.putToParameters("EXTERNAL", "FALSE");
- }
- }
- writer.jsonGenerator.writeString(serializer.toString(partition, "UTF-8"));
- writer.jsonGenerator.flush();
- } catch (TException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
- }
- }
-
- private boolean isPartitionExternal() {
- Map<String, String> params = partition.getParameters();
- return params.containsKey("EXTERNAL")
- && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
deleted file mode 100644
index d88a553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
-import java.io.IOException;
-
-public class ReplicationSpecSerializer implements JsonWriter.Serializer {
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) {
- String value = additionalPropertiesProvider.get(key);
- if (value != null) {
- writer.jsonGenerator.writeStringField(key.toString(), value);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
deleted file mode 100644
index a2e258f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class TableSerializer implements JsonWriter.Serializer {
- private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
- private final Iterable<Partition> partitions;
-
- public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle,
- Iterable<Partition> partitions) {
- this.tableHandle = tableHandle;
- this.partitions = partitions;
- }
-
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- if (cannotReplicateTable(additionalPropertiesProvider)) {
- return;
- }
-
- Table tTable = tableHandle.getTTable();
- tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
- try {
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- writer.jsonGenerator
- .writeStringField("table", serializer.toString(tTable, "UTF-8"));
- writer.jsonGenerator.writeFieldName("partitions");
- writePartitions(writer, additionalPropertiesProvider);
- } catch (TException e) {
- throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
- }
- }
-
- private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvider) {
- return tableHandle == null || additionalPropertiesProvider.isNoop();
- }
-
- private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- if (additionalPropertiesProvider.isInReplicationScope()) {
- table.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
- additionalPropertiesProvider.getCurrentReplicationState());
- if (isExternalTable(table)) {
- // Replication destination will not be external - override if set
- table.putToParameters("EXTERNAL", "FALSE");
- }
- if (isExternalTableType(table)) {
- // Replication dest will not be external - override if set
- table.setTableType(TableType.MANAGED_TABLE.toString());
- }
- } else {
- // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
- // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
- // TODO: if we want to be explicit about this dump not being a replication dump, we can
- // uncomment this else section, but currently unnneeded. Will require a lot of golden file
- // regen if we do so.
- }
- return table;
- }
-
- private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
- return table.isSetTableType()
- && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
- }
-
- private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) {
- Map<String, String> params = table.getParameters();
- return params.containsKey("EXTERNAL")
- && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
- }
-
- private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- writer.jsonGenerator.writeStartArray();
- if (partitions != null) {
- for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
- new PartitionSerializer(partition.getTPartition())
- .writeTo(writer, additionalPropertiesProvider);
- }
- }
- writer.jsonGenerator.writeEndArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
new file mode 100644
index 0000000..846b6f5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class Utils {
+ public static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf)
+ throws SemanticException {
+ DataOutputStream outStream = null;
+ try {
+ FileSystem fs = outputFile.getFileSystem(hiveConf);
+ outStream = fs.create(outputFile);
+ outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+ for (int i = 1; i < values.size(); i++) {
+ outStream.write(Utilities.tabCode);
+ outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
+ }
+ outStream.write(Utilities.newLineCode);
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ } finally {
+ IOUtils.closeStream(outStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
deleted file mode 100644
index 3ebc803..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse.repl.dump;
-
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION;
-
-/**
- * This is not used as of now as the conditional which lead to its usage is always false
- * hence we have removed the conditional and the usage of this class, but might be required in future.
- */
-public class VersionCompatibleSerializer implements JsonWriter.Serializer {
- @Override
- public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
- throws SemanticException, IOException {
- writer.jsonGenerator.writeStringField("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
new file mode 100644
index 0000000..15b7e13
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class DBSerializer implements JsonWriter.Serializer {
+ public static final String FIELD_NAME = "db";
+ private final Database dbObject;
+
+ public DBSerializer(Database dbObject) {
+ this.dbObject = dbObject;
+ }
+
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ dbObject.putToParameters(
+ ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ additionalPropertiesProvider.getCurrentReplicationState()
+ );
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ try {
+ String value = serializer.toString(dbObject, UTF_8);
+ writer.jsonGenerator.writeStringField(FIELD_NAME, value);
+ } catch (TException e) {
+ throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
new file mode 100644
index 0000000..5dc7023
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class FunctionSerializer implements JsonWriter.Serializer {
+ public static final String FIELD_NAME="function";
+ private Function function;
+
+ public FunctionSerializer(Function function) {
+ this.function = function;
+ }
+
+ @Override
+ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
+ throws SemanticException, IOException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ try {
+ writer.jsonGenerator
+ .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8));
+ } catch (TException e) {
+ throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e9356b5/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
new file mode 100644
index 0000000..e20be68
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION;
+
+public class JsonWriter implements Closeable {
+
+ final JsonGenerator jsonGenerator;
+
+ public JsonWriter(FileSystem fs, Path writePath) throws IOException {
+ OutputStream out = fs.create(writePath);
+ jsonGenerator = new JsonFactory().createJsonGenerator(out);
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION);
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonGenerator.writeEndObject();
+ jsonGenerator.close();
+ }
+
+ public interface Serializer {
+ String UTF_8 = "UTF-8";
+ void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws
+ SemanticException, IOException;
+ }
+}