You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/04/14 09:29:04 UTC
[hive] branch master updated: HIVE-23084: Implement kill query in
multiple HS2 environment (Peter Varga, reviewed by Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fcac603 HIVE-23084: Implement kill query in multiple HS2 environment (Peter Varga, reviewed by Adam Szita)
fcac603 is described below
commit fcac6034f4ba36a7d12d4f6af8be09511fedcaa6
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Tue Apr 14 11:20:50 2020 +0200
HIVE-23084: Implement kill query in multiple HS2 environment (Peter Varga, reviewed by Adam Szita)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 3 +
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 7 +
.../hive/jdbc/TestJdbcWithServiceDiscovery.java | 326 +++++++++++++
.../thrift/TestMiniHS2StateWithNoZookeeper.java | 5 +-
.../server/TestKillQueryZookeeperManager.java | 211 +++++++++
.../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java | 4 +
.../java/org/apache/hive/jdbc/HiveStatement.java | 5 +-
.../ql/ddl/process/kill/KillQueriesOperation.java | 4 +-
.../hadoop/hive/ql/exec/tez/TezSessionState.java | 7 +-
.../hive/service/cli/session/HiveSessionImpl.java | 7 +-
.../hive/service/cli/session/SessionManager.java | 12 +
.../apache/hive/service/server/HiveServer2.java | 2 +-
.../apache/hive/service/server/KillQueryImpl.java | 173 ++++---
.../service/server/KillQueryZookeeperManager.java | 525 +++++++++++++++++++++
.../hadoop/hive/common/ZooKeeperHiveHelper.java | 4 +
16 files changed, 1230 insertions(+), 70 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a08dd03..7b3acad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2697,6 +2697,11 @@ public class HiveConf extends Configuration {
"Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
"Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
"system property (note the camelCase)."),
+ HIVE_ZOOKEEPER_KILLQUERY_ENABLE("hive.zookeeper.killquery.enable", true,
+ "Whether enabled kill query coordination with zookeeper, " +
+ "when hive.server2.support.dynamic.service.discovery is enabled."),
+ HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries",
+ "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"),
// Transactions
HIVE_TXN_MANAGER("hive.txn.manager",
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 3973ec9..45b22f9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -784,6 +785,8 @@ public abstract class BaseJdbcWithMiniLlap {
con2.close();
assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+ tExecuteHolder.throwable.getMessage());
assertNull("tCancel", tKillHolder.throwable);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 68a515c..1aab03d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -412,6 +413,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"), false,
tExecuteHolder, tKillHolder);
assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+ tExecuteHolder.throwable.getMessage());
assertNull("tCancel", tKillHolder.throwable);
}
@@ -431,6 +434,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
ExceptionHolder tKillHolder = new ExceptionHolder();
testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder, tKillHolder);
assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+ tExecuteHolder.throwable.getMessage());
assertNull("tCancel", tKillHolder.throwable);
}
@@ -440,6 +445,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
ExceptionHolder tKillHolder = new ExceptionHolder();
testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder);
assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+ tExecuteHolder.throwable.getMessage());
assertNull("tCancel", tKillHolder.throwable);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java
new file mode 100644
index 0000000..1621e7e
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test JDBC driver when two HS2 instance is running with service discovery enabled.
+ */
+public class TestJdbcWithServiceDiscovery {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestJdbcWithServiceDiscovery.class);
+ private static final String TABLE_NAME = "testJdbcMinihs2Tbl";
+ private static final String DB_NAME = "testJdbcMinihs2";
+ private static final String REMOTE_ERROR_MESSAGE = "Unable to kill query locally or on remote servers.";
+
+ private static TestingServer zkServer;
+ private static MiniHS2 miniHS2server1;
+ private static MiniHS2 miniHS2server2;
+ private static String miniHS2directUrl1;
+ private static String miniHS2directUrl2;
+ private static Path kvDataFilePath;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ MiniHS2.cleanupLocalDir();
+ zkServer = new TestingServer();
+
+ // Create one MiniHS2 with Tez and one with Local FS only
+ HiveConf hiveConf1 = getTezConf();
+ HiveConf hiveConf2 = new HiveConf();
+
+ setSDConfigs(hiveConf1);
+ setSDConfigs(hiveConf2);
+
+ miniHS2server1 = new MiniHS2.Builder().withConf(hiveConf1).withMiniTez().build();
+ miniHS2server2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+
+ Class.forName(MiniHS2.getJdbcDriverName());
+ String instanceId1 = UUID.randomUUID().toString();
+ miniHS2server1.start(getConfOverlay(instanceId1));
+ miniHS2directUrl1 =
+ "jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/" + DB_NAME;
+ String instanceId2 = UUID.randomUUID().toString();
+ miniHS2server2.start(getConfOverlay(instanceId2));
+ miniHS2directUrl2 =
+ "jdbc:hive2://" + miniHS2server2.getHost() + ":" + miniHS2server2.getBinaryPort() + "/" + DB_NAME;
+
+ String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", "");
+ kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+ setupDb();
+ }
+
+ /**
+ * SleepMsUDF.
+ */
+ public static class SleepMsUDF extends UDF {
+ public Integer evaluate(int value, int ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // No-op
+ }
+ return value;
+ }
+ }
+
+ public static void setupDb() throws Exception {
+ Connection conDefault = DriverManager
+ .getConnection("jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/default",
+ System.getProperty("user.name"), "bar");
+ Statement stmt = conDefault.createStatement();
+ String tblName = DB_NAME + "." + TABLE_NAME;
+ stmt.execute("drop database if exists " + DB_NAME + " cascade");
+ stmt.execute("create database " + DB_NAME);
+ stmt.execute("use " + DB_NAME);
+ stmt.execute("create table " + tblName + " (int_col int, value string) ");
+ stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + tblName);
+ stmt.execute("grant select on table " + tblName + " to role public");
+
+ stmt.close();
+ conDefault.close();
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if ((miniHS2server1 != null) && miniHS2server1.isStarted()) {
+ try {
+ miniHS2server1.stop();
+ } catch (Exception e) {
+ LOG.warn("Error why shutting down Hs2", e);
+ }
+ }
+ if ((miniHS2server2 != null) && miniHS2server2.isStarted()) {
+ try {
+ miniHS2server2.stop();
+ } catch (Exception e) {
+ LOG.warn("Error why shutting down Hs2", e);
+ }
+ }
+ if (zkServer != null) {
+ zkServer.close();
+ zkServer = null;
+ }
+ MiniHS2.cleanupLocalDir();
+ }
+
+ private static HiveConf getTezConf() throws Exception {
+ String confDir = "../../data/conf/tez/";
+ HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+ HiveConf defaultConf = new HiveConf();
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml"));
+ return defaultConf;
+ }
+
+ private static void setSDConfigs(HiveConf conf) {
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true);
+ conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE, false);
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, 2, TimeUnit.SECONDS);
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 100, TimeUnit.MILLISECONDS);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 1);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, true);
+ }
+
+ private static Map<String, String> getConfOverlay(final String instanceId) {
+ Map<String, String> confOverlay = new HashMap<>();
+ confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
+ confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
+ return confOverlay;
+ }
+
+ private static class ExceptionHolder {
+ Throwable throwable;
+ }
+
+ private void executeQueryAndKill(Connection con1, Connection con2, ExceptionHolder tExecuteHolder,
+ ExceptionHolder tKillHolder) throws SQLException, InterruptedException {
+ final HiveStatement stmt = (HiveStatement) con1.createStatement();
+ final Statement stmt2 = con2.createStatement();
+ final StringBuffer stmtQueryId = new StringBuffer();
+
+ // Thread executing the query
+ Thread tExecute = new Thread(() -> {
+ try {
+ LOG.info("Executing waiting query.");
+ // The test table has 500 rows, so total query time should be ~ 500*500ms
+ stmt.executeAsync(
+ "select sleepMsUDF(t1.int_col, 10), t1.int_col, t2.int_col " + "from " + TABLE_NAME + " t1 join "
+ + TABLE_NAME + " t2 on t1.int_col = t2.int_col");
+ stmtQueryId.append(stmt.getQueryId());
+ stmt.getUpdateCount();
+ } catch (SQLException e) {
+ tExecuteHolder.throwable = e;
+ }
+ });
+
+ tExecute.start();
+
+ // wait for other thread to create the stmt handle
+ int count = 0;
+ while (count < 10) {
+ try {
+ Thread.sleep(2000);
+ String queryId;
+ if (stmtQueryId.length() != 0) {
+ queryId = stmtQueryId.toString();
+ } else {
+ count++;
+ continue;
+ }
+
+ LOG.info("Killing query: " + queryId);
+ stmt2.execute("kill query '" + queryId + "'");
+ stmt2.close();
+ break;
+ } catch (SQLException e) {
+ LOG.warn("Exception when kill query", e);
+ tKillHolder.throwable = e;
+ break;
+ }
+ }
+
+ tExecute.join();
+ try {
+ stmt.close();
+ con1.close();
+ con2.close();
+ } catch (Exception e) {
+ LOG.warn("Exception when close stmt and con", e);
+ }
+ }
+
+ @Test
+ public void testKillQueryWithSameServer() throws Exception {
+ Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+ Connection con2 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+
+ Statement stmt = con1.createStatement();
+ stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+ stmt.close();
+
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage());
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ @Test
+ public void testKillQueryWithDifferentServer() throws Exception {
+ Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+ Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar");
+
+ Statement stmt = con1.createStatement();
+ stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+ stmt.close();
+
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " " + KillQueriesOperation.KILL_QUERY_MESSAGE,
+ tExecuteHolder.throwable.getMessage());
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ @Test
+ public void testKillQueryWithDifferentServerZKTurnedOff() throws Exception {
+ Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+ Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar");
+
+ Statement stmt = con1.createStatement();
+ stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+ stmt.close();
+
+ stmt = con2.createStatement();
+ stmt.execute("set hive.zookeeper.killquery.enable = false");
+ stmt.close();
+
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+ assertNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ @Test
+ public void testKillQueryWithRandomId() throws Exception {
+ Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ Statement stmt = con1.createStatement();
+ String queryId = "randomId123";
+ try {
+ LOG.info("Killing query: " + queryId);
+ stmt.execute("kill query '" + queryId + "'");
+ stmt.close();
+ } catch (SQLException e) {
+ LOG.warn("Exception when kill query", e);
+ tKillHolder.throwable = e;
+ }
+ try {
+ con1.close();
+ } catch (Exception e) {
+ LOG.warn("Exception when close stmt and con", e);
+ }
+
+ assertNotNull("tCancel", tKillHolder.throwable);
+ assertTrue(tKillHolder.throwable.getMessage(), tKillHolder.throwable.getMessage().contains(REMOTE_ERROR_MESSAGE));
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
index 99e681e..0df3058 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
@@ -50,11 +50,14 @@ public class TestMiniHS2StateWithNoZookeeper {
private static HiveConf hiveConf = null;
@BeforeClass
- public static void beforeTest() throws Exception {
+ public static void beforeTest() throws Exception {
+ MiniHS2.cleanupLocalDir();
hiveConf = new HiveConf();
hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true);
hiveConf.setIntVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 0);
hiveConf.setTimeVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 0, TimeUnit.MILLISECONDS);
+ // Disable killquery, this way only HS2 start will fail, not the SessionManager service
+ hiveConf.setBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, false);
miniHS2 = new MiniHS2(hiveConf);
Map<String, String> confOverlay = new HashMap<String, String>();
try {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java
new file mode 100644
index 0000000..d9997a9
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link KillQueryZookeeperManager}.
+ */
+public class TestKillQueryZookeeperManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestKillQueryZookeeperManager.class);
+ private static final String BARRIER_ROOT_PATH = "/killqueries";
+ private static final String QUERYID = "QUERY1";
+ private static final String SERVER1 = "localhost:1234";
+ private static final String SERVER2 = "localhost:1235";
+ private static final String USER = "user";
+ private static final int TIMEOUT = 1000;
+
+ TestingServer server;
+
+ @Before
+ public void setupZookeeper() throws Exception {
+ server = new TestingServer();
+ }
+
+ @After
+ public void shutdown() {
+ if (server != null) {
+ CloseableUtils.closeQuietly(server);
+ }
+ }
+
+ private CuratorFramework getClient() {
+ return CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(TIMEOUT * 100)
+ .connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build();
+ }
+
+ @Test
+ public void testBarrierServerCrash() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+ final ExecutorService service = Executors.newSingleThreadExecutor();
+ Future<Object> future = service.submit(() -> {
+ Thread.sleep(TIMEOUT / 2);
+ server.stop();
+ return null;
+ });
+
+ barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS);
+ future.get();
+ Assert.fail();
+ } catch (KeeperException.ConnectionLossException expected) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testNoBarrier() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ IllegalStateException result = null;
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ try {
+ barrier.confirmProgress(SERVER1);
+ } catch (IllegalStateException e) {
+ result = e;
+ }
+ Assert.assertNotNull(result);
+ Assert.assertEquals("Barrier is not initialised", result.getMessage());
+ }
+ }
+
+ @Test
+ public void testNo() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(() -> {
+ Thread.sleep(TIMEOUT / 2);
+ barrier.confirmNo(SERVER2);
+ return null;
+ });
+
+ Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testDone() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(() -> {
+ Thread.sleep(TIMEOUT / 2);
+ try {
+ barrier.confirmProgress(SERVER2);
+ Thread.sleep(TIMEOUT / 2);
+ barrier.confirmDone(SERVER2);
+ } catch (Exception e) {
+ LOG.error("Confirmation error", e);
+ }
+ return null;
+ });
+
+ Assert.assertTrue(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testFailed() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(() -> {
+ Thread.sleep(TIMEOUT / 2);
+ barrier.confirmProgress(SERVER2);
+ Thread.sleep(TIMEOUT / 2);
+ barrier.confirmFailed(SERVER2);
+ return null;
+ });
+
+ Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testConfirmTimeout() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+ Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testKillTimeout() throws Exception {
+ try (CuratorFramework client = getClient()) {
+ client.start();
+ client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+ final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+ new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+ barrier.setBarrier(QUERYID, SERVER1, USER, true);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(() -> {
+ Thread.sleep(TIMEOUT / 2);
+ barrier.confirmProgress(SERVER2);
+ // server died
+ return null;
+ });
+ Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+ }
+ }
+}
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index eb80086..7f25c74 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -115,6 +115,10 @@ public class MiniHS2 extends AbstractHiveService {
this.miniClusterType = MiniClusterType.MR;
return this;
}
+ public Builder withMiniTez() {
+ this.miniClusterType = MiniClusterType.TEZ;
+ return this;
+ }
public Builder withMiniKdc(String serverPrincipal, String serverKeytab) {
this.useMiniKdc = true;
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index db965e7..543bf8c 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -63,6 +63,7 @@ import java.util.Objects;
public class HiveStatement implements java.sql.Statement {
public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName());
+ public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled.";
private static final int DEFAULT_FETCH_SIZE =
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;
@@ -394,9 +395,9 @@ public class HiveStatement implements java.sql.Statement {
// 01000 -> warning
String errMsg = statusResp.getErrorMessage();
if (errMsg != null && !errMsg.isEmpty()) {
- throw new SQLException("Query was cancelled. " + errMsg, "01000");
+ throw new SQLException(QUERY_CANCELLED_MESSAGE + " " + errMsg, "01000");
} else {
- throw new SQLException("Query was cancelled", "01000");
+ throw new SQLException(QUERY_CANCELLED_MESSAGE, "01000");
}
case TIMEDOUT_STATE:
throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
index afde1a4..26c7fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
@@ -31,11 +31,13 @@ public class KillQueriesOperation extends DDLOperation<KillQueriesDesc> {
super(context, desc);
}
+ public static final String KILL_QUERY_MESSAGE = "User invoked KILL QUERY";
+
@Override
public int execute() throws HiveException {
SessionState sessionState = SessionState.get();
for (String queryId : desc.getQueryIds()) {
- sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", context.getDb().getConf());
+ sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf());
}
LOG.info("kill query called ({})", desc.getQueryIds());
return 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 8becef1..5b3fa8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -842,8 +842,11 @@ public class TezSessionState {
private void addJarLRByClass(Class<?> clazz, final Map<String, LocalResource> lrMap) throws IOException,
LoginException {
- final File jar =
- new File(Utilities.jarFinderGetJar(clazz));
+ String jarPath = Utilities.jarFinderGetJar(clazz);
+ if (jarPath == null) {
+ throw new IOException("Can't find jar for: " + clazz);
+ }
+ final File jar = new File(jarPath);
final String localJarPath = jar.toURI().toURL().toExternalForm();
final LocalResource jarLr = createJarLocalResource(localJarPath);
lrMap.put(DagUtils.getBaseName(jarLr), jarLr);
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9e49754..9c7ee54 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -76,6 +76,7 @@ import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.server.KillQueryImpl;
+import org.apache.hive.service.server.KillQueryZookeeperManager;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,7 +168,11 @@ public class HiveSessionImpl implements HiveSession {
} catch (Exception e) {
throw new HiveSQLException(e);
}
- sessionState.setKillQuery(new KillQueryImpl(operationManager));
+ KillQueryZookeeperManager killQueryZookeeperManager = null;
+ if (sessionManager != null) {
+ killQueryZookeeperManager = sessionManager.getKillQueryZookeeperManager();
+ }
+ sessionState.setKillQuery(new KillQueryImpl(operationManager, killQueryZookeeperManager));
SessionState.start(sessionState);
try {
sessionState.loadAuxJars();
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 277519c..57031f4 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.server.HiveServer2;
+import org.apache.hive.service.server.KillQueryZookeeperManager;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,6 +86,7 @@ public class SessionManager extends CompositeService {
private int ipAddressLimit;
private int userIpAddressLimit;
private final OperationManager operationManager = new OperationManager();
+ private KillQueryZookeeperManager killQueryZookeeperManager;
private ThreadPoolExecutor backgroundOperationPool;
private boolean isOperationLogEnabled;
private File operationLogRootDir;
@@ -114,6 +116,12 @@ public class SessionManager extends CompositeService {
}
createBackgroundOperationPool();
addService(operationManager);
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) &&
+ !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE) &&
+ hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) {
+ killQueryZookeeperManager = new KillQueryZookeeperManager(operationManager, hiveServer2);
+ addService(killQueryZookeeperManager);
+ }
initSessionImplClassName();
Metrics metrics = MetricsFactory.getInstance();
if(metrics != null){
@@ -625,6 +633,10 @@ public class SessionManager extends CompositeService {
return operationManager;
}
+ public KillQueryZookeeperManager getKillQueryZookeeperManager() {
+ return killQueryZookeeperManager;
+ }
+
private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>();
public static void setIpAddress(String ipAddress) {
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 3e7d127..42b7e59 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -645,7 +645,7 @@ public class HiveServer2 extends CompositeService {
return false;
}
- private String getServerInstanceURI() throws Exception {
+ public String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 883e32b..e15cd1f 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -22,10 +22,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -55,14 +55,19 @@ public class KillQueryImpl implements KillQuery {
private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
private final OperationManager operationManager;
- private enum TagOrId {TAG, ID, UNKNOWN};
+ private final KillQueryZookeeperManager killQueryZookeeperManager;
- public KillQueryImpl(OperationManager operationManager) {
+ private enum TagOrId {TAG, ID, UNKNOWN}
+
+
+ public KillQueryImpl(OperationManager operationManager, KillQueryZookeeperManager killQueryZookeeperManager) {
this.operationManager = operationManager;
+ this.killQueryZookeeperManager = killQueryZookeeperManager;
}
- public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException {
- Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin)
+ throws IOException, YarnException {
+ Set<ApplicationId> childYarnJobs = new HashSet<>();
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(ApplicationsRequestScope.OWN);
gar.setApplicationTags(Collections.singleton(tag));
@@ -70,10 +75,13 @@ public class KillQueryImpl implements KillQuery {
ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
GetApplicationsResponse apps = proxy.getApplications(gar);
List<ApplicationReport> appsList = apps.getApplicationList();
- for(ApplicationReport appReport : appsList) {
- if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + SessionState.get()
- .getUserName())) {
+ for (ApplicationReport appReport : appsList) {
+ if (doAsAdmin) {
childYarnJobs.add(appReport.getApplicationId());
+ } else if (StringUtils.isNotBlank(doAs)) {
+ if (appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + doAs)) {
+ childYarnJobs.add(appReport.getApplicationId());
+ }
}
}
@@ -86,13 +94,13 @@ public class KillQueryImpl implements KillQuery {
return childYarnJobs;
}
- public static void killChildYarnJobs(Configuration conf, String tag) {
+ public static void killChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) {
try {
if (tag == null) {
return;
}
LOG.info("Killing yarn jobs using query tag:" + tag);
- Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
+ Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag, doAs, doAsAdmin);
if (!childYarnJobs.isEmpty()) {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
@@ -102,7 +110,7 @@ public class KillQueryImpl implements KillQuery {
}
}
} catch (IOException | YarnException ye) {
- LOG.warn("Exception occurred while killing child job({})", ye);
+ LOG.warn("Exception occurred while killing child job({})", tag, ye);
}
}
@@ -110,76 +118,117 @@ public class KillQueryImpl implements KillQuery {
boolean isAdmin = false;
if (SessionState.get().getAuthorizerV2() != null) {
try {
- SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY,
- new ArrayList<HivePrivilegeObject>(), new ArrayList<HivePrivilegeObject>(),
- new HiveAuthzContext.Builder().build());
+ SessionState.get().getAuthorizerV2()
+ .checkPrivileges(HiveOperationType.KILL_QUERY, new ArrayList<>(),
+ new ArrayList<>(), new HiveAuthzContext.Builder().build());
isAdmin = true;
} catch (Exception e) {
+ LOG.warn("Error while checking privileges", e);
}
}
return isAdmin;
}
- private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws
- HiveSQLException {
- if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get()
- .getAuthenticator().getUserName())) {
+ private boolean cancelOperation(Operation operation, String doAs, boolean doAsAdmin, String errMsg)
+ throws HiveSQLException {
+ if (doAsAdmin || (!StringUtils.isBlank(doAs) && operation.getParentSession().getUserName().equals(doAs))) {
OperationHandle handle = operation.getHandle();
operationManager.cancelOperation(handle, errMsg);
return true;
- } else {
- return false;
}
+ return false;
+ }
+
+ public boolean isLocalQuery(String queryIdOrTag) {
+ TagOrId tagOrId = TagOrId.UNKNOWN;
+ if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+ tagOrId = TagOrId.ID;
+ } else if (!operationManager.getOperationsByQueryTag(queryIdOrTag).isEmpty()) {
+ tagOrId = TagOrId.TAG;
+ }
+ return tagOrId != TagOrId.UNKNOWN;
}
@Override
public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException {
- try {
- TagOrId tagOrId = TagOrId.UNKNOWN;
- Set<Operation> operationsToKill = new HashSet<Operation>();
- if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
- operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
- tagOrId = TagOrId.ID;
- } else {
- operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
- if (!operationsToKill.isEmpty()) {
- tagOrId = TagOrId.TAG;
- }
+ killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin());
+ }
+
+ public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin)
+ throws HiveException {
+ killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin);
+ }
+
+ private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs,
+ boolean doAsAdmin) throws HiveException {
+ errMsg = StringUtils.defaultString(errMsg, KillQueriesOperation.KILL_QUERY_MESSAGE);
+ TagOrId tagOrId = TagOrId.UNKNOWN;
+ Set<Operation> operationsToKill = new HashSet<>();
+ if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+ operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
+ tagOrId = TagOrId.ID;
+ LOG.debug("Query found with id: {}", queryIdOrTag);
+ } else {
+ operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
+ if (!operationsToKill.isEmpty()) {
+ tagOrId = TagOrId.TAG;
+ LOG.debug("Query found with tag: {}", queryIdOrTag);
}
- if (operationsToKill.isEmpty()) {
- LOG.info("Query not found: " + queryIdOrTag);
+ }
+ if (!operationsToKill.isEmpty()){
+ killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin);
+ } else {
+ LOG.debug("Query not found with tag/id: {}", queryIdOrTag);
+ if (!onlyLocal && killQueryZookeeperManager != null &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) {
+ try {
+ LOG.debug("Killing query with zookeeper coordination: " + queryIdOrTag);
+ killQueryZookeeperManager
+ .killQuery(queryIdOrTag, SessionState.get().getAuthenticator().getUserName(), isAdmin());
+ } catch (IOException e) {
+ LOG.error("Kill query failed for queryId: " + queryIdOrTag, e);
+ throw new HiveException("Unable to kill query locally or on remote servers.", e);
+ }
+ } else {
+ LOG.warn("Unable to kill query with id {}", queryIdOrTag);
}
- boolean admin = isAdmin();
- switch(tagOrId) {
- case ID:
- Operation operation = operationsToKill.iterator().next();
- boolean canceled = cancelOperation(operation, admin, errMsg);
- if (canceled) {
- String queryTag = operation.getQueryTag();
- if (queryTag == null) {
- queryTag = queryIdOrTag;
- }
- killChildYarnJobs(conf, queryTag);
- } else {
- // no privilege to cancel
- throw new HiveSQLException("No privilege to kill query id");
- }
- break;
- case TAG:
- int numCanceled = 0;
- for (Operation operationToKill : operationsToKill) {
- if (cancelOperation(operationToKill, admin, errMsg)) {
- numCanceled++;
- }
+ }
+ }
+
+ private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId,
+ Set<Operation> operationsToKill, String doAs, boolean doAsAdmin) throws HiveException {
+ try {
+ switch (tagOrId) {
+ case ID:
+ Operation operation = operationsToKill.iterator().next();
+ boolean canceled = cancelOperation(operation, doAs, doAsAdmin, errMsg);
+ if (canceled) {
+ String queryTag = operation.getQueryTag();
+ if (queryTag == null) {
+ queryTag = queryIdOrTag;
}
- killChildYarnJobs(conf, queryIdOrTag);
- if (numCanceled == 0) {
- throw new HiveSQLException("No privilege to kill query tag");
+ killChildYarnJobs(conf, queryTag, doAs, doAsAdmin);
+ } else {
+ // no privilege to cancel
+ throw new HiveSQLException("No privilege to kill query id");
+ }
+ break;
+ case TAG:
+ int numCanceled = 0;
+ for (Operation operationToKill : operationsToKill) {
+ if (cancelOperation(operationToKill, doAs, doAsAdmin, errMsg)) {
+ numCanceled++;
}
- break;
- case UNKNOWN:
- killChildYarnJobs(conf, queryIdOrTag);
- break;
+ }
+ if (numCanceled == 0) {
+ throw new HiveSQLException("No privilege to kill query tag");
+ } else {
+ killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin);
+ }
+ break;
+ case UNKNOWN:
+ default:
+ break;
}
} catch (HiveSQLException e) {
LOG.error("Kill query failed for query " + queryIdOrTag, e);
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java b/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java
new file mode 100644
index 0000000..396364b
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ZookeeperUtils;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kill query coordination service.
+ * When service discovery is enabled a local kill query can request a kill
+ * on every other HS2 server with the queryId or queryTag and wait for confirmation on denial.
+ * The communication is done through Zookeeper.
+ */
+public class KillQueryZookeeperManager extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KillQueryZookeeperManager.class);
+ private static final String SASL_LOGIN_CONTEXT_NAME = "KillQueryZooKeeperClient";
+ public static final int MAX_WAIT_ON_CONFIRMATION_SECONDS = 30;
+ public static final int MAX_WAIT_ON_KILL_SECONDS = 180;
+
+ private CuratorFramework zooKeeperClient;
+ private String zkPrincipal, zkKeytab, zkNameSpace;
+ private final KillQueryImpl localKillQueryImpl;
+ private final HiveServer2 hiveServer2;
+ private HiveConf conf;
+
+ // Path cache to watch queries to kill
+ private PathChildrenCache killQueryListener = null;
+
+ public KillQueryZookeeperManager(OperationManager operationManager, HiveServer2 hiveServer2) {
+ super(KillQueryZookeeperManager.class.getSimpleName());
+ this.hiveServer2 = hiveServer2;
+ localKillQueryImpl = new KillQueryImpl(operationManager, this);
+ }
+
+ @Override
+ public synchronized void init(HiveConf conf) {
+ this.conf = conf;
+ zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE);
+ Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace),
+ HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE.varname + " cannot be null or empty");
+ this.zkPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+ this.zkKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ this.zooKeeperClient = conf.getZKConfig().getNewZookeeperClient(getACLProviderForZKPath("/" + zkNameSpace));
+ this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+
+ super.init(conf);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ if (zooKeeperClient == null) {
+ throw new ServiceException("Failed start zookeeperClient in KillQueryZookeeperManager");
+ }
+ try {
+ ZookeeperUtils.setupZookeeperAuth(this.getHiveConf(), SASL_LOGIN_CONTEXT_NAME, zkPrincipal, zkKeytab);
+ zooKeeperClient.start();
+ try {
+ zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + zkNameSpace);
+ if (ZookeeperUtils.isKerberosEnabled(conf)) {
+ zooKeeperClient.setACL().withACL(createSecureAcls()).forPath("/" + zkNameSpace);
+ }
+ LOG.info("Created the root namespace: " + zkNameSpace + " on ZooKeeper");
+ } catch (KeeperException e) {
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ LOG.error("Unable to create namespace: " + zkNameSpace + " on ZooKeeper", e);
+ throw e;
+ }
+ }
+ // Create a path cache and start to listen for every kill query request from other servers.
+ killQueryListener = new PathChildrenCache(zooKeeperClient, "/" + zkNameSpace, false);
+ killQueryListener.start(PathChildrenCache.StartMode.NORMAL);
+ startListeningForQueries();
+ // Init closeable utils in case register is not called (see HIVE-13322)
+ CloseableUtils.class.getName();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager", e);
+ }
+ LOG.info("KillQueryZookeeperManager service started.");
+ }
+
+ private ACLProvider getACLProviderForZKPath(String zkPath) {
+ final boolean isSecure = ZookeeperUtils.isKerberosEnabled(conf);
+ return new ACLProvider() {
+ @Override
+ public List<ACL> getDefaultAcl() {
+ // We always return something from getAclForPath so this should not happen.
+ LOG.warn("getDefaultAcl was called");
+ return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public List<ACL> getAclForPath(String path) {
+ if (!isSecure || path == null || !path.contains(zkPath)) {
+ // No security or the path is below the user path - full access.
+ return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+ return createSecureAcls();
+ }
+ };
+ }
+
+ private static List<ACL> createSecureAcls() {
+ // Read all to the world
+ List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
+ // Create/Delete/Write/Admin to creator
+ nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
+ return nodeAcls;
+ }
+
+ private void startListeningForQueries() {
+ PathChildrenCacheListener listener = (client, pathChildrenCacheEvent) -> {
+ if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
+ KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace,
+ ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath()));
+ Optional<KillQueryZookeeperData> data = barrier.getKillQueryData();
+ if (!data.isPresent()) {
+ return;
+ }
+ KillQueryZookeeperData killQuery = data.get();
+ LOG.debug("Kill query request with id {}", killQuery.getQueryId());
+ if (getServerHost().equals(killQuery.getRequestingServer())) {
+ // The listener was called for the server who posted the request
+ return;
+ }
+ if (localKillQueryImpl.isLocalQuery(killQuery.getQueryId())) {
+ LOG.info("Killing query with id {}", killQuery.getQueryId());
+ barrier.confirmProgress(getServerHost());
+ try {
+ localKillQueryImpl
+ .killLocalQuery(killQuery.getQueryId(), conf, killQuery.getDoAs(), killQuery.isDoAsAdmin());
+ barrier.confirmDone(getServerHost());
+ } catch (Exception e) {
+ LOG.error("Unable to kill local query", e);
+ barrier.confirmFailed(getServerHost());
+ }
+ } else {
+ LOG.debug("Confirm unknown kill query request with id {}", killQuery.getQueryId());
+ barrier.confirmNo(getServerHost());
+ }
+ }
+ };
+ LOG.info("Start to listen for kill query requests.");
+ killQueryListener.getListenable().addListener(listener);
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ LOG.info("Stopping KillQueryZookeeperManager service.");
+ CloseableUtils.closeQuietly(killQueryListener);
+ CloseableUtils.closeQuietly(zooKeeperClient);
+ }
+
+ private List<String> getAllServerUrls() {
+ List<String> serverHosts = new ArrayList<>();
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && !conf
+ .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) {
+ String zooKeeperNamespace = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ try {
+ serverHosts.addAll(zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace));
+ } catch (Exception e) {
+ LOG.error("Unable the get available server hosts", e);
+ }
+ }
+ return serverHosts;
+ }
+
+ private String getServerHost() {
+ if (hiveServer2 == null) {
+ return "";
+ }
+ try {
+ return removeDelimiter(hiveServer2.getServerInstanceURI());
+ } catch (Exception e) {
+ LOG.error("Unable to determine the server host", e);
+ return "";
+ }
+ }
+
+ // for debugging
+ private static class ZkConnectionStateListener implements ConnectionStateListener {
+ @Override
+ public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+ LOG.info("Connection state change notification received. State: {}", connectionState);
+ }
+ }
+
+ /**
+ * Post a kill query request on Zookeeper for the other HS2 instances. If the service discovery is not enabled or
+ * there is no other server registered, does nothing. Otherwise post the kill query request on Zookeeper
+ * and waits for the other instances to confirm the kill or deny it.
+ *
+ * @param queryIdOrTag queryId or tag to kill
+ * @param doAs user requesting the kill
+ * @param doAsAdmin admin user requesting the kill (with KILLQUERY privilege)
+ * @throws IOException If the kill query failed
+ */
+ public void killQuery(String queryIdOrTag, String doAs, boolean doAsAdmin) throws IOException {
+ List<String> serverHosts = getAllServerUrls();
+ if (serverHosts.size() < 2) {
+ return;
+ }
+ KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace);
+ boolean result;
+ try {
+ barrier.setBarrier(queryIdOrTag, hiveServer2.getServerInstanceURI(), doAs, doAsAdmin);
+ LOG.info("Created kill query barrier in path: {} for queryId: {}", barrier.getBarrierPath(), queryIdOrTag);
+ result = barrier.waitOnBarrier(serverHosts.size() - 1, MAX_WAIT_ON_CONFIRMATION_SECONDS,
+ MAX_WAIT_ON_KILL_SECONDS, TimeUnit.SECONDS);
+
+ } catch (Exception e) {
+ LOG.error("Unable to create Barrier on Zookeeper for KillQuery", e);
+ throw new IOException(e);
+ }
+ if (!result) {
+ throw new IOException("Unable to kill query on remote servers");
+ }
+ }
+
+ /**
+ * Data to post to Zookeeper for a kill query request. The fields will be serialized with ':' delimiter.
+ * In requestingServer every ':' will be escaped. Other fields can not contain any ':'.
+ */
+ public static class KillQueryZookeeperData {
+ private String queryId;
+ private String requestingServer;
+ private String doAs;
+ private boolean doAsAdmin;
+
+ public KillQueryZookeeperData(String queryId, String requestingServer, String doAs, boolean doAsAdmin) {
+ if (!StringUtils.equals(queryId, removeDelimiter(queryId))) {
+ throw new IllegalArgumentException("QueryId can not contain any ':' character.");
+ }
+ this.queryId = queryId;
+ this.requestingServer = removeDelimiter(requestingServer);
+ if (!StringUtils.equals(doAs, removeDelimiter(doAs))) {
+ throw new IllegalArgumentException("doAs can not contain any ':' character.");
+ }
+ this.doAs = doAs;
+ this.doAsAdmin = doAsAdmin;
+ }
+
+ public KillQueryZookeeperData(String data) {
+ if (data == null) {
+ return;
+ }
+
+ String[] elem = data.split(":");
+ queryId = elem[0];
+ requestingServer = elem[1];
+ doAs = elem[2];
+ doAsAdmin = Boolean.parseBoolean(elem[3]);
+ }
+
+ @Override
+ public String toString() {
+ return queryId + ":" + requestingServer + ":" + doAs + ":" + doAsAdmin;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getRequestingServer() {
+ return requestingServer;
+ }
+
+ public String getDoAs() {
+ return doAs;
+ }
+
+ public boolean isDoAsAdmin() {
+ return doAsAdmin;
+ }
+ }
+
+ /**
+ * Zookeeper Barrier for the KillQuery Operation.
+ * It post a kill query request on Zookeeper and waits until the given number of service instances responses.
+ * Implementation is based on org.apache.curator.framework.recipes.barriers.DistributedBarrier.
+ */
+ public static class KillQueryZookeeperBarrier {
+ private final CuratorFramework client;
+ private final String barrierPath;
+ private final Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ client.postSafeNotify(KillQueryZookeeperBarrier.this);
+ }
+ };
+
+ /**
+ * @param client client
+ * @param barrierRootPath rootPath to put the barrier
+ */
+ public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath) {
+ this(client, barrierRootPath, UUID.randomUUID().toString());
+ }
+
+ /**
+ * @param client client
+ * @param barrierRootPath rootPath to put the barrier
+ * @param barrierPath name of the barrier
+ */
+ public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath, String barrierPath) {
+ this.client = client;
+ this.barrierPath = PathUtils.validatePath(barrierRootPath + "/" + barrierPath);
+ }
+
+ public String getBarrierPath() {
+ return barrierPath;
+ }
+
+ /**
+ * Utility to set the barrier node.
+ *
+ * @throws Exception errors
+ */
+ public synchronized void setBarrier(String queryId, String requestingServer, String doAs, boolean doAsAdmin)
+ throws Exception {
+ try {
+ KillQueryZookeeperData data = new KillQueryZookeeperData(queryId, requestingServer, doAs, doAsAdmin);
+ client.create().creatingParentContainersIfNeeded()
+ .forPath(barrierPath, data.toString().getBytes(StandardCharsets.UTF_8));
+ } catch (KeeperException.NodeExistsException e) {
+ throw new IllegalStateException("Barrier with this path already exists");
+ }
+ }
+
+ public synchronized Optional<KillQueryZookeeperData> getKillQueryData() throws Exception {
+ if (client.checkExists().forPath(barrierPath) != null) {
+ byte[] data = client.getData().forPath(barrierPath);
+ return Optional.of(new KillQueryZookeeperData(new String(data, StandardCharsets.UTF_8)));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Confirm not knowing the query with the queryId in the barrier.
+ *
+ * @param serverId The serverHost confirming the request
+ * @throws Exception If confirmation failed
+ */
+ public synchronized void confirmNo(String serverId) throws Exception {
+ if (client.checkExists().forPath(barrierPath) != null) {
+ client.create().forPath(barrierPath + "/NO:" + serverId);
+ } else {
+ throw new IllegalStateException("Barrier is not initialised");
+ }
+ }
+
+ /**
+ * Confirm knowing the query with the queryId in the barrier and starting the kill query process.
+ *
+ * @param serverId The serverHost confirming the request
+ * @throws Exception If confirmation failed
+ */
+ public synchronized void confirmProgress(String serverId) throws Exception {
+ if (client.checkExists().forPath(barrierPath) != null) {
+ client.create().forPath(barrierPath + "/PROGRESS:" + serverId);
+ } else {
+ throw new IllegalStateException("Barrier is not initialised");
+ }
+ }
+
+ /**
+ * Confirm killing the query with the queryId in the barrier.
+ *
+ * @param serverId The serverHost confirming the request
+ * @throws Exception If confirmation failed
+ */
+ public synchronized void confirmDone(String serverId) throws Exception {
+ if (client.checkExists().forPath(barrierPath) != null) {
+ if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) {
+ client.delete().forPath(barrierPath + "/PROGRESS:" + serverId);
+ }
+ client.create().forPath(barrierPath + "/DONE:" + serverId);
+ } else {
+ throw new IllegalStateException("Barrier is not initialised");
+ }
+ }
+
+ /**
+ * Confirm failure of killing the query with the queryId in the barrier.
+ *
+ * @param serverId The serverHost confirming the request
+ * @throws Exception If confirmation failed
+ */
+ public synchronized void confirmFailed(String serverId) throws Exception {
+ if (client.checkExists().forPath(barrierPath) != null) {
+ if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) {
+ client.delete().forPath(barrierPath + "/PROGRESS:" + serverId);
+ }
+ client.create().forPath(barrierPath + "/FAILED:" + serverId);
+ } else {
+ throw new IllegalStateException("Barrier is not initialised");
+ }
+ }
+
+ /**
+ * Wait for every server either confirm killing the query or confirm not knowing the query.
+ *
+ * @param confirmationCount number of confirmation to wait for
+ * @param maxWaitOnConfirmation confirmation waiting timeout for NO answers
+ * @param maxWaitOnKill timeout for waiting on the actual kill query operation
+ * @param unit time unit for timeouts
+ * @return true if the kill was confirmed, false on timeout or if everybody voted for NO
+ * @throws Exception If confirmation failed
+ */
+ public synchronized boolean waitOnBarrier(int confirmationCount, long maxWaitOnConfirmation, long maxWaitOnKill,
+ TimeUnit unit) throws Exception {
+ long startMs = System.currentTimeMillis();
+ long startKill = -1;
+ long maxWaitMs = TimeUnit.MILLISECONDS.convert(maxWaitOnConfirmation, unit);
+ long maxWaitOnKillMs = TimeUnit.MILLISECONDS.convert(maxWaitOnKill, unit);
+
+ boolean progress = false;
+ boolean result = false;
+ while (true) {
+ List<String> children = client.getChildren().usingWatcher(watcher).forPath(barrierPath);
+ boolean concluded = false;
+ for (String child : children) {
+ if (child.startsWith("DONE")) {
+ result = true;
+ concluded = true;
+ break;
+ }
+ if (child.startsWith("FAILED")) {
+ concluded = true;
+ break;
+ }
+ if (child.startsWith("PROGRESS")) {
+ progress = true;
+ }
+ }
+ if (concluded) {
+ break;
+ }
+ if (progress) {
+ // Wait for the kill query to finish
+ if (startKill < 0) {
+ startKill = System.currentTimeMillis();
+ }
+ long elapsed = System.currentTimeMillis() - startKill;
+ long thisWaitMs = maxWaitOnKillMs - elapsed;
+ if (thisWaitMs <= 0) {
+ break;
+ }
+ wait(thisWaitMs);
+ } else {
+ if (children.size() == confirmationCount) {
+ result = false;
+ break;
+ }
+ // Wait for confirmation
+ long elapsed = System.currentTimeMillis() - startMs;
+ long thisWaitMs = maxWaitMs - elapsed;
+ if (thisWaitMs <= 0) {
+ break;
+ }
+ wait(thisWaitMs);
+ }
+
+ }
+ client.delete().deletingChildrenIfNeeded().forPath(barrierPath);
+ return result;
+ }
+ }
+
+ private static String removeDelimiter(String in) {
+ if (in == null) {
+ return null;
+ }
+ return in.replaceAll(":", "");
+ }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
index 71d8651..1e35795 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
@@ -29,6 +29,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -337,6 +338,9 @@ public class ZooKeeperHiveHelper {
}
if (maxRetries > 0) {
builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+ } else {
+ // Retry policy is mandatory
+ builder = builder.retryPolicy(new RetryOneTime(1000));
}
if (zooKeeperAclProvider != null) {
builder = builder.aclProvider(zooKeeperAclProvider);