You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/17 23:25:32 UTC
hive git commit: HIVE-16272 : support for drop function in
incremental replication (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 029763517 -> 1f49635e8
HIVE-16272 : support for drop function in incremental replication (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1f49635e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1f49635e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1f49635e
Branch: refs/heads/master
Commit: 1f49635e8ddbe2fbd8551d0a7f1b9ec618da29d0
Parents: 0297635
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Wed May 17 12:29:07 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Wed May 17 16:25:29 2017 -0700
----------------------------------------------------------------------
...TestReplicationScenariosAcrossInstances.java | 26 ++++++++++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 22 ++++-----
.../hadoop/hive/ql/parse/repl/DumpType.java | 7 +++
.../repl/dump/events/DropFunctionHandler.java | 42 +++++++++++++++++
.../repl/dump/events/EventHandlerFactory.java | 1 +
.../repl/load/message/DropFunctionHandler.java | 48 ++++++++++++++++++++
.../repl/load/message/DropTableHandler.java | 8 ++--
7 files changed, 136 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 5621f26..3c1ef08 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -27,12 +27,14 @@ import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+
public class TestReplicationScenariosAcrossInstances {
@Rule
public final TestName testName = new TestName();
@Rule
- public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule();
+ public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<String>());
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
@@ -64,7 +66,7 @@ public class TestReplicationScenariosAcrossInstances {
}
@Test
- public void testIncrementalFunctionReplication() throws Throwable {
+ public void testCreateFunctionIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
replica.load(replicatedDbName, bootStrapDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
@@ -83,6 +85,26 @@ public class TestReplicationScenariosAcrossInstances {
}
@Test
+ public void testDropFunctionIncrementalReplication() throws Throwable {
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+ + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+ replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName)
+ .verify(bootStrapDump.lastReplicationId);
+
+ primary.run("Drop FUNCTION " + primaryDbName + ".testFunction ");
+
+ WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+ replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName)
+ .verify(incrementalDump.lastReplicationId)
+ .run("SHOW FUNCTIONS LIKE '*testfunction*'")
+ .verify(null);
+ }
+
+ @Test
public void testBootstrapFunctionReplication() throws Throwable {
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f8bb248..7271eae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -42,9 +42,8 @@ class WarehouseInstance {
private Driver driver;
private HiveMetaStoreClient client;
private HiveConf hconf;
- private ReplicationV1CompatRule bcompat = null;
-
+ private static int schemaNameCounter = 0;
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
/**
@@ -55,7 +54,6 @@ class WarehouseInstance {
this.driver = other.driver;
this.client = other.client;
this.hconf = other.hconf;
- this.bcompat = other.bcompat;
}
WarehouseInstance() throws Exception {
@@ -65,7 +63,7 @@ class WarehouseInstance {
+ Path.SEPARATOR
+ TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
+ "_"
- + System.currentTimeMillis();
+ + System.nanoTime();
if (metaStoreUri != null) {
hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
@@ -78,6 +76,10 @@ class WarehouseInstance {
hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/");
+ String schemaName = "APP" + schemaNameCounter++;
+ System.setProperty("datanucleus.mapping.Schema", schemaName);
+ hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
+ "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true");
int metaStorePort = MetaStoreUtils.startMetaStore(hconf);
hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/");
hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort);
@@ -95,8 +97,6 @@ class WarehouseInstance {
driver = new Driver(hconf);
SessionState.start(new CliSessionState(hconf));
client = new HiveMetaStoreClient(hconf);
-
- bcompat = new ReplicationV1CompatRule(client,hconf);
}
private int next = 0;
@@ -148,7 +148,7 @@ class WarehouseInstance {
}
WarehouseInstance verify(String data) throws IOException {
- verifyResults(new String[] { data });
+ verifyResults(data == null ? new String[] {} : new String[] { data });
return this;
}
@@ -186,12 +186,8 @@ class WarehouseInstance {
}
}
- public TestRule getReplivationV1CompatRule(){
- return bcompat;
- }
-
- public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
- bcompat.doBackwardCompatibilityCheck(eventsMustExist);
+ public ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip){
+ return new ReplicationV1CompatRule(client,hconf,testsToSkip);
}
static class Tuple {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 5fa9808..0580546 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl;
import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.InsertHandler;
@@ -49,6 +50,12 @@ public enum DumpType {
return new DropTableHandler();
}
},
+ EVENT_DROP_FUNCTION("EVENT_DROP_FUNCTION") {
+ @Override
+ public MessageHandler handler() {
+ return new DropFunctionHandler();
+ }
+ },
EVENT_DROP_PARTITION("EVENT_DROP_PARTITION") {
@Override
public MessageHandler handler() {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
new file mode 100644
index 0000000..352b0cc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropFunctionHandler extends AbstractEventHandler {
+
+ DropFunctionHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_DROP_FUNCTION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index 08dbd13..7e655fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -41,6 +41,7 @@ public class EventHandlerFactory {
register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
register(MessageFactory.INSERT_EVENT, InsertHandler.class);
+ register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
}
static void register(String event, Class<? extends EventHandler> handlerClazz) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
new file mode 100644
index 0000000..daf7b2a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DropFunctionDesc;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class DropFunctionHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+ DropFunctionMessage msg = deserializer.getDropFunctionMessage(context.dmd.getPayload());
+ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+ String qualifiedFunctionName =
+ FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName);
+ DropFunctionDesc desc = new DropFunctionDesc(qualifiedFunctionName, false);
+ Task<FunctionWork> dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf);
+ context.log.debug(
+ "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName()
+ );
+ databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ return Collections.singletonList(dropFunctionTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
index 943a6a6..e6e06c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -38,13 +38,15 @@ public class DropTableHandler extends AbstractMessageHandler {
DropTableDesc dropTableDesc = new DropTableDesc(
actualDbName + "." + actualTblName,
null, true, true,
- eventOnlyReplicationSpec(context));
+ eventOnlyReplicationSpec(context)
+ );
Task<DDLWork> dropTableTask = TaskFactory.get(
new DDLWork(readEntitySet, writeEntitySet, dropTableDesc),
context.hiveConf
);
- context.log
- .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+ context.log.debug(
+ "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()
+ );
databasesUpdated.put(actualDbName, context.dmd.getEventTo());
return Collections.singletonList(dropTableTask);
}