You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/17 23:25:32 UTC

hive git commit: HIVE-16272 : support for drop function in incremental replication (Anishek Agarwal, reviewed by Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master 029763517 -> 1f49635e8


HIVE-16272 : support for drop function in incremental replication (Anishek Agarwal, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1f49635e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1f49635e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1f49635e

Branch: refs/heads/master
Commit: 1f49635e8ddbe2fbd8551d0a7f1b9ec618da29d0
Parents: 0297635
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Wed May 17 12:29:07 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Wed May 17 16:25:29 2017 -0700

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 26 ++++++++++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java | 22 ++++-----
 .../hadoop/hive/ql/parse/repl/DumpType.java     |  7 +++
 .../repl/dump/events/DropFunctionHandler.java   | 42 +++++++++++++++++
 .../repl/dump/events/EventHandlerFactory.java   |  1 +
 .../repl/load/message/DropFunctionHandler.java  | 48 ++++++++++++++++++++
 .../repl/load/message/DropTableHandler.java     |  8 ++--
 7 files changed, 136 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 5621f26..3c1ef08 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -27,12 +27,14 @@ import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+
 public class TestReplicationScenariosAcrossInstances {
   @Rule
   public final TestName testName = new TestName();
 
   @Rule
-  public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule();
+  public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<String>());
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
 
@@ -64,7 +66,7 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
-  public void testIncrementalFunctionReplication() throws Throwable {
+  public void testCreateFunctionIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
     replica.load(replicatedDbName, bootStrapDump.dumpLocation)
         .run("REPL STATUS " + replicatedDbName)
@@ -83,6 +85,26 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
+  public void testDropFunctionIncrementalReplication() throws Throwable {
+    primary.run("CREATE FUNCTION " + primaryDbName
+        + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+        + "using jar  'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+        .run("REPL STATUS " + replicatedDbName)
+        .verify(bootStrapDump.lastReplicationId);
+
+    primary.run("Drop FUNCTION " + primaryDbName + ".testFunction ");
+
+    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+        .run("REPL STATUS " + replicatedDbName)
+        .verify(incrementalDump.lastReplicationId)
+        .run("SHOW FUNCTIONS LIKE '*testfunction*'")
+        .verify(null);
+  }
+
+  @Test
   public void testBootstrapFunctionReplication() throws Throwable {
     primary.run("CREATE FUNCTION " + primaryDbName
         + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f8bb248..7271eae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -42,9 +42,8 @@ class WarehouseInstance {
   private Driver driver;
   private HiveMetaStoreClient client;
   private HiveConf hconf;
-  private ReplicationV1CompatRule bcompat = null;
-
 
+  private static int schemaNameCounter = 0;
   private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
 
   /**
@@ -55,7 +54,6 @@ class WarehouseInstance {
     this.driver = other.driver;
     this.client = other.client;
     this.hconf = other.hconf;
-    this.bcompat = other.bcompat;
   }
 
   WarehouseInstance() throws Exception {
@@ -65,7 +63,7 @@ class WarehouseInstance {
         + Path.SEPARATOR
         + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
         + "_"
-        + System.currentTimeMillis();
+        + System.nanoTime();
 
     if (metaStoreUri != null) {
       hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
@@ -78,6 +76,10 @@ class WarehouseInstance {
     hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/");
+    String schemaName = "APP" + schemaNameCounter++;
+    System.setProperty("datanucleus.mapping.Schema", schemaName);
+    hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
+        "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true");
     int metaStorePort = MetaStoreUtils.startMetaStore(hconf);
     hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/");
     hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort);
@@ -95,8 +97,6 @@ class WarehouseInstance {
     driver = new Driver(hconf);
     SessionState.start(new CliSessionState(hconf));
     client = new HiveMetaStoreClient(hconf);
-
-    bcompat = new ReplicationV1CompatRule(client,hconf);
   }
 
   private int next = 0;
@@ -148,7 +148,7 @@ class WarehouseInstance {
   }
 
   WarehouseInstance verify(String data) throws IOException {
-    verifyResults(new String[] { data });
+    verifyResults(data == null ? new String[] {} : new String[] { data });
     return this;
   }
 
@@ -186,12 +186,8 @@ class WarehouseInstance {
     }
   }
 
-  public TestRule getReplivationV1CompatRule(){
-    return bcompat;
-  }
-
-  public void doBackwardCompatibilityCheck(boolean eventsMustExist) {
-    bcompat.doBackwardCompatibilityCheck(eventsMustExist);
+  public ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip){
+    return new ReplicationV1CompatRule(client,hconf,testsToSkip);
   }
 
   static class Tuple {

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 5fa9808..0580546 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl;
 
 import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.InsertHandler;
@@ -49,6 +50,12 @@ public enum DumpType {
       return new DropTableHandler();
     }
   },
+  EVENT_DROP_FUNCTION("EVENT_DROP_FUNCTION") {
+    @Override
+    public MessageHandler handler() {
+      return new DropFunctionHandler();
+    }
+  },
   EVENT_DROP_PARTITION("EVENT_DROP_PARTITION") {
     @Override
     public MessageHandler handler() {

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
new file mode 100644
index 0000000..352b0cc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class DropFunctionHandler extends AbstractEventHandler {
+
+  DropFunctionHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_DROP_FUNCTION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index 08dbd13..7e655fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -41,6 +41,7 @@ public class EventHandlerFactory {
     register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
     register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
     register(MessageFactory.INSERT_EVENT, InsertHandler.class);
+    register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class);
   }
 
   static void register(String event, Class<? extends EventHandler> handlerClazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
new file mode 100644
index 0000000..daf7b2a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DropFunctionDesc;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class DropFunctionHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    DropFunctionMessage msg = deserializer.getDropFunctionMessage(context.dmd.getPayload());
+    String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName;
+    String qualifiedFunctionName =
+        FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName);
+    DropFunctionDesc desc = new DropFunctionDesc(qualifiedFunctionName, false);
+    Task<FunctionWork> dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf);
+    context.log.debug(
+        "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName()
+    );
+    databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+    return Collections.singletonList(dropFunctionTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1f49635e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
index 943a6a6..e6e06c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -38,13 +38,15 @@ public class DropTableHandler extends AbstractMessageHandler {
     DropTableDesc dropTableDesc = new DropTableDesc(
         actualDbName + "." + actualTblName,
         null, true, true,
-        eventOnlyReplicationSpec(context));
+        eventOnlyReplicationSpec(context)
+    );
     Task<DDLWork> dropTableTask = TaskFactory.get(
         new DDLWork(readEntitySet, writeEntitySet, dropTableDesc),
         context.hiveConf
     );
-    context.log
-        .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+    context.log.debug(
+        "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()
+    );
     databasesUpdated.put(actualDbName, context.dmd.getEventTo());
     return Collections.singletonList(dropTableTask);
   }