You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/04/14 09:29:04 UTC

[hive] branch master updated: HIVE-23084: Implement kill query in multiple HS2 environment (Peter Varga, reviewed by Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fcac603  HIVE-23084: Implement kill query in multiple HS2 environment (Peter Varga, reviewed by Adam Szita)
fcac603 is described below

commit fcac6034f4ba36a7d12d4f6af8be09511fedcaa6
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Tue Apr 14 11:20:50 2020 +0200

    HIVE-23084: Implement kill query in multiple HS2 environment (Peter Varga, reviewed by Adam Szita)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java |   3 +
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java       |   7 +
 .../hive/jdbc/TestJdbcWithServiceDiscovery.java    | 326 +++++++++++++
 .../thrift/TestMiniHS2StateWithNoZookeeper.java    |   5 +-
 .../server/TestKillQueryZookeeperManager.java      | 211 +++++++++
 .../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java |   4 +
 .../java/org/apache/hive/jdbc/HiveStatement.java   |   5 +-
 .../ql/ddl/process/kill/KillQueriesOperation.java  |   4 +-
 .../hadoop/hive/ql/exec/tez/TezSessionState.java   |   7 +-
 .../hive/service/cli/session/HiveSessionImpl.java  |   7 +-
 .../hive/service/cli/session/SessionManager.java   |  12 +
 .../apache/hive/service/server/HiveServer2.java    |   2 +-
 .../apache/hive/service/server/KillQueryImpl.java  | 173 ++++---
 .../service/server/KillQueryZookeeperManager.java  | 525 +++++++++++++++++++++
 .../hadoop/hive/common/ZooKeeperHiveHelper.java    |   4 +
 16 files changed, 1230 insertions(+), 70 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a08dd03..7b3acad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2697,6 +2697,11 @@ public class HiveConf extends Configuration {
         "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
             "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
              "system property (note the camelCase)."),
+    HIVE_ZOOKEEPER_KILLQUERY_ENABLE("hive.zookeeper.killquery.enable", true,
+        "Whether enabled kill query coordination with zookeeper, " +
+            "when hive.server2.support.dynamic.service.discovery is enabled."),
+    HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries",
+        "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"),
 
     // Transactions
     HIVE_TXN_MANAGER("hive.txn.manager",
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 3973ec9..45b22f9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -784,6 +785,8 @@ public abstract class BaseJdbcWithMiniLlap {
     con2.close();
 
     assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+        tExecuteHolder.throwable.getMessage());
     assertNull("tCancel", tKillHolder.throwable);
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 68a515c..1aab03d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -31,6 +31,7 @@ import java.util.List;
 import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
 import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -412,6 +413,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"), false,
             tExecuteHolder, tKillHolder);
     assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+        tExecuteHolder.throwable.getMessage());
     assertNull("tCancel", tKillHolder.throwable);
   }
 
@@ -431,6 +434,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     ExceptionHolder tKillHolder = new ExceptionHolder();
     testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder, tKillHolder);
     assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+        tExecuteHolder.throwable.getMessage());
     assertNull("tCancel", tKillHolder.throwable);
   }
 
@@ -440,6 +445,8 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     ExceptionHolder tKillHolder = new ExceptionHolder();
     testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder);
     assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE,
+        tExecuteHolder.throwable.getMessage());
     assertNull("tCancel", tKillHolder.throwable);
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java
new file mode 100644
index 0000000..1621e7e
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test JDBC driver when two HS2 instance is running with service discovery enabled.
+ */
+public class TestJdbcWithServiceDiscovery {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestJdbcWithServiceDiscovery.class);
+  private static final String TABLE_NAME = "testJdbcMinihs2Tbl";
+  private static final String DB_NAME = "testJdbcMinihs2";
+  private static final String REMOTE_ERROR_MESSAGE = "Unable to kill query locally or on remote servers.";
+
+  private static TestingServer zkServer;
+  private static MiniHS2 miniHS2server1;
+  private static MiniHS2 miniHS2server2;
+  private static String miniHS2directUrl1;
+  private static String miniHS2directUrl2;
+  private static Path kvDataFilePath;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    MiniHS2.cleanupLocalDir();
+    zkServer = new TestingServer();
+
+    // Create one MiniHS2 with Tez and one with Local FS only
+    HiveConf hiveConf1 = getTezConf();
+    HiveConf hiveConf2 = new HiveConf();
+
+    setSDConfigs(hiveConf1);
+    setSDConfigs(hiveConf2);
+
+    miniHS2server1 = new MiniHS2.Builder().withConf(hiveConf1).withMiniTez().build();
+    miniHS2server2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+
+    Class.forName(MiniHS2.getJdbcDriverName());
+    String instanceId1 = UUID.randomUUID().toString();
+    miniHS2server1.start(getConfOverlay(instanceId1));
+    miniHS2directUrl1 =
+        "jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/" + DB_NAME;
+    String instanceId2 = UUID.randomUUID().toString();
+    miniHS2server2.start(getConfOverlay(instanceId2));
+    miniHS2directUrl2 =
+        "jdbc:hive2://" + miniHS2server2.getHost() + ":" + miniHS2server2.getBinaryPort() + "/" + DB_NAME;
+
+    String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", "");
+    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+    setupDb();
+  }
+
+  /**
+   * SleepMsUDF.
+   */
+  public static class SleepMsUDF extends UDF {
+    public Integer evaluate(int value, int ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // No-op
+      }
+      return value;
+    }
+  }
+
+  public static void setupDb() throws Exception {
+    Connection conDefault = DriverManager
+        .getConnection("jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/default",
+            System.getProperty("user.name"), "bar");
+    Statement stmt = conDefault.createStatement();
+    String tblName = DB_NAME + "." + TABLE_NAME;
+    stmt.execute("drop database if exists " + DB_NAME + " cascade");
+    stmt.execute("create database " + DB_NAME);
+    stmt.execute("use " + DB_NAME);
+    stmt.execute("create table " + tblName + " (int_col int, value string) ");
+    stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + tblName);
+    stmt.execute("grant select on table " + tblName + " to role public");
+
+    stmt.close();
+    conDefault.close();
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if ((miniHS2server1 != null) && miniHS2server1.isStarted()) {
+      try {
+        miniHS2server1.stop();
+      } catch (Exception e) {
+        LOG.warn("Error why shutting down Hs2", e);
+      }
+    }
+    if ((miniHS2server2 != null) && miniHS2server2.isStarted()) {
+      try {
+        miniHS2server2.stop();
+      } catch (Exception e) {
+        LOG.warn("Error why shutting down Hs2", e);
+      }
+    }
+    if (zkServer != null) {
+      zkServer.close();
+      zkServer = null;
+    }
+    MiniHS2.cleanupLocalDir();
+  }
+
+  private static HiveConf getTezConf() throws Exception {
+    String confDir = "../../data/conf/tez/";
+    HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+    System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+    HiveConf defaultConf = new HiveConf();
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml"));
+    return defaultConf;
+  }
+
+  private static void setSDConfigs(HiveConf conf) {
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true);
+    conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE, false);
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, 2, TimeUnit.SECONDS);
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 100, TimeUnit.MILLISECONDS);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 1);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, true);
+  }
+
+  private static Map<String, String> getConfOverlay(final String instanceId) {
+    Map<String, String> confOverlay = new HashMap<>();
+    confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
+    confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
+    return confOverlay;
+  }
+
+  private static class ExceptionHolder {
+    Throwable throwable;
+  }
+
+  private void executeQueryAndKill(Connection con1, Connection con2, ExceptionHolder tExecuteHolder,
+      ExceptionHolder tKillHolder) throws SQLException, InterruptedException {
+    final HiveStatement stmt = (HiveStatement) con1.createStatement();
+    final Statement stmt2 = con2.createStatement();
+    final StringBuffer stmtQueryId = new StringBuffer();
+
+    // Thread executing the query
+    Thread tExecute = new Thread(() -> {
+      try {
+        LOG.info("Executing waiting query.");
+        // The test table has 500 rows, so total query time should be ~ 500*500ms
+        stmt.executeAsync(
+            "select sleepMsUDF(t1.int_col, 10), t1.int_col, t2.int_col " + "from " + TABLE_NAME + " t1 join "
+                + TABLE_NAME + " t2 on t1.int_col = t2.int_col");
+        stmtQueryId.append(stmt.getQueryId());
+        stmt.getUpdateCount();
+      } catch (SQLException e) {
+        tExecuteHolder.throwable = e;
+      }
+    });
+
+    tExecute.start();
+
+    // wait for other thread to create the stmt handle
+    int count = 0;
+    while (count < 10) {
+      try {
+        Thread.sleep(2000);
+        String queryId;
+        if (stmtQueryId.length() != 0) {
+          queryId = stmtQueryId.toString();
+        } else {
+          count++;
+          continue;
+        }
+
+        LOG.info("Killing query: " + queryId);
+        stmt2.execute("kill query '" + queryId + "'");
+        stmt2.close();
+        break;
+      } catch (SQLException e) {
+        LOG.warn("Exception when kill query", e);
+        tKillHolder.throwable = e;
+        break;
+      }
+    }
+
+    tExecute.join();
+    try {
+      stmt.close();
+      con1.close();
+      con2.close();
+    } catch (Exception e) {
+      LOG.warn("Exception when close stmt and con", e);
+    }
+  }
+
+  @Test
+  public void testKillQueryWithSameServer() throws Exception {
+    Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+    Connection con2 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+
+    Statement stmt = con1.createStatement();
+    stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+    stmt.close();
+
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage());
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  @Test
+  public void testKillQueryWithDifferentServer() throws Exception {
+    Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+    Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar");
+
+    Statement stmt = con1.createStatement();
+    stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+    stmt.close();
+
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " " + KillQueriesOperation.KILL_QUERY_MESSAGE,
+        tExecuteHolder.throwable.getMessage());
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  @Test
+  public void testKillQueryWithDifferentServerZKTurnedOff() throws Exception {
+    Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+    Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar");
+
+    Statement stmt = con1.createStatement();
+    stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+    stmt.close();
+
+    stmt = con2.createStatement();
+    stmt.execute("set hive.zookeeper.killquery.enable = false");
+    stmt.close();
+
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder);
+
+    assertNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  @Test
+  public void testKillQueryWithRandomId() throws Exception {
+    Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar");
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    Statement stmt = con1.createStatement();
+    String queryId = "randomId123";
+    try {
+      LOG.info("Killing query: " + queryId);
+      stmt.execute("kill query '" + queryId + "'");
+      stmt.close();
+    } catch (SQLException e) {
+      LOG.warn("Exception when kill query", e);
+      tKillHolder.throwable = e;
+    }
+    try {
+      con1.close();
+    } catch (Exception e) {
+      LOG.warn("Exception when close stmt and con", e);
+    }
+
+    assertNotNull("tCancel", tKillHolder.throwable);
+    assertTrue(tKillHolder.throwable.getMessage(), tKillHolder.throwable.getMessage().contains(REMOTE_ERROR_MESSAGE));
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
index 99e681e..0df3058 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java
@@ -50,11 +50,14 @@ public class TestMiniHS2StateWithNoZookeeper {
   private static HiveConf hiveConf = null;
 
   @BeforeClass
-  public static void beforeTest() throws Exception   { 
+  public static void beforeTest() throws Exception   {
+    MiniHS2.cleanupLocalDir();
     hiveConf = new HiveConf();
     hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true);
     hiveConf.setIntVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 0);
     hiveConf.setTimeVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 0, TimeUnit.MILLISECONDS);
+    // Disable killquery, this way only HS2 start will fail, not the SessionManager service
+    hiveConf.setBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, false);
     miniHS2 = new MiniHS2(hiveConf);
     Map<String, String> confOverlay = new HashMap<String, String>();
     try {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java
new file mode 100644
index 0000000..d9997a9
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link KillQueryZookeeperManager}.
+ */
+public class TestKillQueryZookeeperManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestKillQueryZookeeperManager.class);
+  private static final String BARRIER_ROOT_PATH = "/killqueries";
+  private static final String QUERYID = "QUERY1";
+  private static final String SERVER1 = "localhost:1234";
+  private static final String SERVER2 = "localhost:1235";
+  private static final String USER = "user";
+  private static final int TIMEOUT = 1000;
+
+  TestingServer server;
+
+  @Before
+  public void setupZookeeper() throws Exception {
+    server = new TestingServer();
+  }
+
+  @After
+  public void shutdown() {
+    if (server != null) {
+      CloseableUtils.closeQuietly(server);
+    }
+  }
+
+  private CuratorFramework getClient() {
+    return CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(TIMEOUT * 100)
+        .connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build();
+  }
+
+  @Test
+  public void testBarrierServerCrash() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+      final ExecutorService service = Executors.newSingleThreadExecutor();
+      Future<Object> future = service.submit(() -> {
+        Thread.sleep(TIMEOUT / 2);
+        server.stop();
+        return null;
+      });
+
+      barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS);
+      future.get();
+      Assert.fail();
+    } catch (KeeperException.ConnectionLossException expected) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testNoBarrier() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      IllegalStateException result = null;
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      try {
+        barrier.confirmProgress(SERVER1);
+      } catch (IllegalStateException e) {
+        result = e;
+      }
+      Assert.assertNotNull(result);
+      Assert.assertEquals("Barrier is not initialised", result.getMessage());
+    }
+  }
+
+  @Test
+  public void testNo() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      service.submit(() -> {
+        Thread.sleep(TIMEOUT / 2);
+        barrier.confirmNo(SERVER2);
+        return null;
+      });
+
+      Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+    }
+  }
+
+  @Test
+  public void testDone() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      service.submit(() -> {
+        Thread.sleep(TIMEOUT / 2);
+        try {
+          barrier.confirmProgress(SERVER2);
+          Thread.sleep(TIMEOUT / 2);
+          barrier.confirmDone(SERVER2);
+        } catch (Exception e) {
+          LOG.error("Confirmation error", e);
+        }
+        return null;
+      });
+
+      Assert.assertTrue(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS));
+    }
+  }
+
+  @Test
+  public void testFailed() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      service.submit(() -> {
+        Thread.sleep(TIMEOUT / 2);
+        barrier.confirmProgress(SERVER2);
+        Thread.sleep(TIMEOUT / 2);
+        barrier.confirmFailed(SERVER2);
+        return null;
+      });
+
+      Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+    }
+  }
+
+  @Test
+  public void testConfirmTimeout() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+
+      Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+    }
+  }
+
+  @Test
+  public void testKillTimeout() throws Exception {
+    try (CuratorFramework client = getClient()) {
+      client.start();
+      client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH);
+      final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier =
+          new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH);
+      barrier.setBarrier(QUERYID, SERVER1, USER, true);
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      service.submit(() -> {
+        Thread.sleep(TIMEOUT / 2);
+        barrier.confirmProgress(SERVER2);
+        // server died
+        return null;
+      });
+      Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS));
+    }
+  }
+}
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index eb80086..7f25c74 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -115,6 +115,10 @@ public class MiniHS2 extends AbstractHiveService {
       this.miniClusterType = MiniClusterType.MR;
       return this;
     }
+    public Builder withMiniTez() {
+      this.miniClusterType = MiniClusterType.TEZ;
+      return this;
+    }
 
     public Builder withMiniKdc(String serverPrincipal, String serverKeytab) {
       this.useMiniKdc = true;
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index db965e7..543bf8c 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -63,6 +63,7 @@ import java.util.Objects;
 public class HiveStatement implements java.sql.Statement {
   public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName());
 
+  public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled.";
   private static final int DEFAULT_FETCH_SIZE =
       HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;
 
@@ -394,9 +395,9 @@ public class HiveStatement implements java.sql.Statement {
             // 01000 -> warning
             String errMsg = statusResp.getErrorMessage();
             if (errMsg != null && !errMsg.isEmpty()) {
-              throw new SQLException("Query was cancelled. " + errMsg, "01000");
+              throw new SQLException(QUERY_CANCELLED_MESSAGE + " " + errMsg, "01000");
             } else {
-              throw new SQLException("Query was cancelled", "01000");
+              throw new SQLException(QUERY_CANCELLED_MESSAGE, "01000");
             }
           case TIMEDOUT_STATE:
             throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
index afde1a4..26c7fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java
@@ -31,11 +31,13 @@ public class KillQueriesOperation extends DDLOperation<KillQueriesDesc> {
     super(context, desc);
   }
 
+  public static final String KILL_QUERY_MESSAGE = "User invoked KILL QUERY";
+
   @Override
   public int execute() throws HiveException {
     SessionState sessionState = SessionState.get();
     for (String queryId : desc.getQueryIds()) {
-      sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", context.getDb().getConf());
+      sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf());
     }
     LOG.info("kill query called ({})", desc.getQueryIds());
     return 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 8becef1..5b3fa8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -842,8 +842,11 @@ public class TezSessionState {
 
   private void addJarLRByClass(Class<?> clazz, final Map<String, LocalResource> lrMap) throws IOException,
       LoginException {
-    final File jar =
-        new File(Utilities.jarFinderGetJar(clazz));
+    String jarPath = Utilities.jarFinderGetJar(clazz);
+    if (jarPath == null) {
+      throw new IOException("Can't find jar for: " + clazz);
+    }
+    final File jar = new File(jarPath);
     final String localJarPath = jar.toURI().toURL().toExternalForm();
     final LocalResource jarLr = createJarLocalResource(localJarPath);
     lrMap.put(DagUtils.getBaseName(jarLr), jarLr);
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9e49754..9c7ee54 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -76,6 +76,7 @@ import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.server.KillQueryImpl;
+import org.apache.hive.service.server.KillQueryZookeeperManager;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,7 +168,11 @@ public class HiveSessionImpl implements HiveSession {
     } catch (Exception e) {
       throw new HiveSQLException(e);
     }
-    sessionState.setKillQuery(new KillQueryImpl(operationManager));
+    KillQueryZookeeperManager killQueryZookeeperManager = null;
+    if (sessionManager != null) {
+      killQueryZookeeperManager = sessionManager.getKillQueryZookeeperManager();
+    }
+    sessionState.setKillQuery(new KillQueryImpl(operationManager, killQueryZookeeperManager));
     SessionState.start(sessionState);
     try {
       sessionState.loadAuxJars();
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 277519c..57031f4 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.server.HiveServer2;
+import org.apache.hive.service.server.KillQueryZookeeperManager;
 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,6 +86,7 @@ public class SessionManager extends CompositeService {
   private int ipAddressLimit;
   private int userIpAddressLimit;
   private final OperationManager operationManager = new OperationManager();
+  private KillQueryZookeeperManager killQueryZookeeperManager;
   private ThreadPoolExecutor backgroundOperationPool;
   private boolean isOperationLogEnabled;
   private File operationLogRootDir;
@@ -114,6 +116,12 @@ public class SessionManager extends CompositeService {
     }
     createBackgroundOperationPool();
     addService(operationManager);
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) &&
+        !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE) &&
+        hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) {
+      killQueryZookeeperManager = new KillQueryZookeeperManager(operationManager, hiveServer2);
+      addService(killQueryZookeeperManager);
+    }
     initSessionImplClassName();
     Metrics metrics = MetricsFactory.getInstance();
     if(metrics != null){
@@ -625,6 +633,10 @@ public class SessionManager extends CompositeService {
     return operationManager;
   }
 
+  public KillQueryZookeeperManager getKillQueryZookeeperManager() {
+    return killQueryZookeeperManager;
+  }
+
   private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>();
 
   public static void setIpAddress(String ipAddress) {
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 3e7d127..42b7e59 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -645,7 +645,7 @@ public class HiveServer2 extends CompositeService {
     return false;
   }
 
-  private String getServerInstanceURI() throws Exception {
+  public String getServerInstanceURI() throws Exception {
     if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
       throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
     }
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 883e32b..e15cd1f 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -22,10 +22,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.session.KillQuery;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -55,14 +55,19 @@ public class KillQueryImpl implements KillQuery {
   private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
 
   private final OperationManager operationManager;
-  private enum TagOrId {TAG, ID, UNKNOWN};
+  private final KillQueryZookeeperManager killQueryZookeeperManager;
 
-  public KillQueryImpl(OperationManager operationManager) {
+  private enum TagOrId {TAG, ID, UNKNOWN}
+
+
+  public KillQueryImpl(OperationManager operationManager, KillQueryZookeeperManager killQueryZookeeperManager) {
     this.operationManager = operationManager;
+    this.killQueryZookeeperManager = killQueryZookeeperManager;
   }
 
-  public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException {
-    Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+  public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin)
+      throws IOException, YarnException {
+    Set<ApplicationId> childYarnJobs = new HashSet<>();
     GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
     gar.setScope(ApplicationsRequestScope.OWN);
     gar.setApplicationTags(Collections.singleton(tag));
@@ -70,10 +75,13 @@ public class KillQueryImpl implements KillQuery {
     ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
     GetApplicationsResponse apps = proxy.getApplications(gar);
     List<ApplicationReport> appsList = apps.getApplicationList();
-    for(ApplicationReport appReport : appsList) {
-      if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + SessionState.get()
-              .getUserName())) {
+    for (ApplicationReport appReport : appsList) {
+      if (doAsAdmin) {
         childYarnJobs.add(appReport.getApplicationId());
+      } else if (StringUtils.isNotBlank(doAs)) {
+        if (appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + doAs)) {
+          childYarnJobs.add(appReport.getApplicationId());
+        }
       }
     }
 
@@ -86,13 +94,13 @@ public class KillQueryImpl implements KillQuery {
     return childYarnJobs;
   }
 
-  public static void killChildYarnJobs(Configuration conf, String tag) {
+  public static void killChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) {
     try {
       if (tag == null) {
         return;
       }
       LOG.info("Killing yarn jobs using query tag:" + tag);
-      Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
+      Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag, doAs, doAsAdmin);
       if (!childYarnJobs.isEmpty()) {
         YarnClient yarnClient = YarnClient.createYarnClient();
         yarnClient.init(conf);
@@ -102,7 +110,7 @@ public class KillQueryImpl implements KillQuery {
         }
       }
     } catch (IOException | YarnException ye) {
-      LOG.warn("Exception occurred while killing child job({})", ye);
+      LOG.warn("Exception occurred while killing child job({})", tag, ye);
     }
   }
 
@@ -110,76 +118,117 @@ public class KillQueryImpl implements KillQuery {
     boolean isAdmin = false;
     if (SessionState.get().getAuthorizerV2() != null) {
       try {
-        SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY,
-                new ArrayList<HivePrivilegeObject>(), new ArrayList<HivePrivilegeObject>(),
-                new HiveAuthzContext.Builder().build());
+        SessionState.get().getAuthorizerV2()
+            .checkPrivileges(HiveOperationType.KILL_QUERY, new ArrayList<>(),
+                new ArrayList<>(), new HiveAuthzContext.Builder().build());
         isAdmin = true;
       } catch (Exception e) {
+        LOG.warn("Error while checking privileges", e);
       }
     }
     return isAdmin;
   }
 
-  private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws
-          HiveSQLException {
-    if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get()
-            .getAuthenticator().getUserName())) {
+  private boolean cancelOperation(Operation operation, String doAs, boolean doAsAdmin, String errMsg)
+      throws HiveSQLException {
+    if (doAsAdmin || (!StringUtils.isBlank(doAs) && operation.getParentSession().getUserName().equals(doAs))) {
       OperationHandle handle = operation.getHandle();
       operationManager.cancelOperation(handle, errMsg);
       return true;
-    } else {
-      return false;
     }
+    return false;
+  }
+
+  public boolean isLocalQuery(String queryIdOrTag) {
+    TagOrId tagOrId = TagOrId.UNKNOWN;
+    if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+      tagOrId = TagOrId.ID;
+    } else if (!operationManager.getOperationsByQueryTag(queryIdOrTag).isEmpty()) {
+      tagOrId = TagOrId.TAG;
+    }
+    return tagOrId != TagOrId.UNKNOWN;
   }
 
   @Override
   public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException {
-    try {
-      TagOrId tagOrId = TagOrId.UNKNOWN;
-      Set<Operation> operationsToKill = new HashSet<Operation>();
-      if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
-        operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
-        tagOrId = TagOrId.ID;
-      } else {
-        operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
-        if (!operationsToKill.isEmpty()) {
-          tagOrId = TagOrId.TAG;
-        }
+    killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin());
+  }
+
+  public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin)
+      throws HiveException {
+    killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin);
+  }
+
+  private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs,
+      boolean doAsAdmin) throws HiveException {
+    errMsg = StringUtils.defaultString(errMsg, KillQueriesOperation.KILL_QUERY_MESSAGE);
+    TagOrId tagOrId = TagOrId.UNKNOWN;
+    Set<Operation> operationsToKill = new HashSet<>();
+    if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+      operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
+      tagOrId = TagOrId.ID;
+      LOG.debug("Query found with id: {}", queryIdOrTag);
+    } else {
+      operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
+      if (!operationsToKill.isEmpty()) {
+        tagOrId = TagOrId.TAG;
+        LOG.debug("Query found with tag: {}", queryIdOrTag);
       }
-      if (operationsToKill.isEmpty()) {
-        LOG.info("Query not found: " + queryIdOrTag);
+    }
+    if (!operationsToKill.isEmpty()){
+      killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin);
+    } else {
+      LOG.debug("Query not found with tag/id: {}", queryIdOrTag);
+      if (!onlyLocal && killQueryZookeeperManager != null &&
+          conf.getBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) {
+        try {
+          LOG.debug("Killing query with zookeeper coordination: " + queryIdOrTag);
+          killQueryZookeeperManager
+              .killQuery(queryIdOrTag, SessionState.get().getAuthenticator().getUserName(), isAdmin());
+        } catch (IOException e) {
+          LOG.error("Kill query failed for queryId: " + queryIdOrTag, e);
+          throw new HiveException("Unable to kill query locally or on remote servers.", e);
+        }
+      } else {
+        LOG.warn("Unable to kill query with id {}", queryIdOrTag);
       }
-      boolean admin = isAdmin();
-      switch(tagOrId) {
-        case ID:
-          Operation operation = operationsToKill.iterator().next();
-          boolean canceled = cancelOperation(operation, admin, errMsg);
-          if (canceled) {
-            String queryTag = operation.getQueryTag();
-            if (queryTag == null) {
-              queryTag = queryIdOrTag;
-            }
-            killChildYarnJobs(conf, queryTag);
-          } else {
-            // no privilege to cancel
-            throw new HiveSQLException("No privilege to kill query id");
-          }
-          break;
-        case TAG:
-          int numCanceled = 0;
-          for (Operation operationToKill : operationsToKill) {
-            if (cancelOperation(operationToKill, admin, errMsg)) {
-              numCanceled++;
-            }
+    }
+  }
+
+  private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId,
+      Set<Operation> operationsToKill, String doAs, boolean doAsAdmin) throws HiveException {
+    try {
+      switch (tagOrId) {
+      case ID:
+        Operation operation = operationsToKill.iterator().next();
+        boolean canceled = cancelOperation(operation, doAs, doAsAdmin, errMsg);
+        if (canceled) {
+          String queryTag = operation.getQueryTag();
+          if (queryTag == null) {
+            queryTag = queryIdOrTag;
           }
-          killChildYarnJobs(conf, queryIdOrTag);
-          if (numCanceled == 0) {
-            throw new HiveSQLException("No privilege to kill query tag");
+          killChildYarnJobs(conf, queryTag, doAs, doAsAdmin);
+        } else {
+          // no privilege to cancel
+          throw new HiveSQLException("No privilege to kill query id");
+        }
+        break;
+      case TAG:
+        int numCanceled = 0;
+        for (Operation operationToKill : operationsToKill) {
+          if (cancelOperation(operationToKill, doAs, doAsAdmin, errMsg)) {
+            numCanceled++;
           }
-          break;
-        case UNKNOWN:
-          killChildYarnJobs(conf, queryIdOrTag);
-          break;
+        }
+        if (numCanceled == 0) {
+          throw new HiveSQLException("No privilege to kill query tag");
+        } else {
+          killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin);
+        }
+        break;
+      case UNKNOWN:
+      default:
+        break;
       }
     } catch (HiveSQLException e) {
       LOG.error("Kill query failed for query " + queryIdOrTag, e);
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java b/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java
new file mode 100644
index 0000000..396364b
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ZookeeperUtils;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kill query coordination service.
+ * When service discovery is enabled a local kill query can request a kill
+ * on every other HS2 server with the queryId or queryTag and wait for confirmation on denial.
+ * The communication is done through Zookeeper.
+ */
+public class KillQueryZookeeperManager extends AbstractService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KillQueryZookeeperManager.class);
+  private static final String SASL_LOGIN_CONTEXT_NAME = "KillQueryZooKeeperClient";
+  public static final int MAX_WAIT_ON_CONFIRMATION_SECONDS = 30;
+  public static final int MAX_WAIT_ON_KILL_SECONDS = 180;
+
+  private CuratorFramework zooKeeperClient;
+  private String zkPrincipal, zkKeytab, zkNameSpace;
+  private final KillQueryImpl localKillQueryImpl;
+  private final HiveServer2 hiveServer2;
+  private HiveConf conf;
+
+  // Path cache to watch queries to kill
+  private PathChildrenCache killQueryListener = null;
+
+  public KillQueryZookeeperManager(OperationManager operationManager, HiveServer2 hiveServer2) {
+    super(KillQueryZookeeperManager.class.getSimpleName());
+    this.hiveServer2 = hiveServer2;
+    localKillQueryImpl = new KillQueryImpl(operationManager, this);
+  }
+
+  @Override
+  public synchronized void init(HiveConf conf) {
+    this.conf = conf;
+    zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE);
+    Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace),
+        HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE.varname + " cannot be null or empty");
+    this.zkPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+    this.zkKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+    this.zooKeeperClient = conf.getZKConfig().getNewZookeeperClient(getACLProviderForZKPath("/" + zkNameSpace));
+    this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (zooKeeperClient == null) {
+      throw new ServiceException("Failed start zookeeperClient in KillQueryZookeeperManager");
+    }
+    try {
+      ZookeeperUtils.setupZookeeperAuth(this.getHiveConf(), SASL_LOGIN_CONTEXT_NAME, zkPrincipal, zkKeytab);
+      zooKeeperClient.start();
+      try {
+        zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + zkNameSpace);
+        if (ZookeeperUtils.isKerberosEnabled(conf)) {
+          zooKeeperClient.setACL().withACL(createSecureAcls()).forPath("/" + zkNameSpace);
+        }
+        LOG.info("Created the root namespace: " + zkNameSpace + " on ZooKeeper");
+      } catch (KeeperException e) {
+        if (e.code() != KeeperException.Code.NODEEXISTS) {
+          LOG.error("Unable to create namespace: " + zkNameSpace + " on ZooKeeper", e);
+          throw e;
+        }
+      }
+      // Create a path cache and start to listen for every kill query request from other servers.
+      killQueryListener = new PathChildrenCache(zooKeeperClient, "/" + zkNameSpace, false);
+      killQueryListener.start(PathChildrenCache.StartMode.NORMAL);
+      startListeningForQueries();
+      // Init closeable utils in case register is not called (see HIVE-13322)
+      CloseableUtils.class.getName();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager", e);
+    }
+    LOG.info("KillQueryZookeeperManager service started.");
+  }
+
+  private ACLProvider getACLProviderForZKPath(String zkPath) {
+    final boolean isSecure = ZookeeperUtils.isKerberosEnabled(conf);
+    return new ACLProvider() {
+      @Override
+      public List<ACL> getDefaultAcl() {
+        // We always return something from getAclForPath so this should not happen.
+        LOG.warn("getDefaultAcl was called");
+        return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+      }
+
+      @Override
+      public List<ACL> getAclForPath(String path) {
+        if (!isSecure || path == null || !path.contains(zkPath)) {
+          // No security or the path is below the user path - full access.
+          return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        }
+        return createSecureAcls();
+      }
+    };
+  }
+
+  private static List<ACL> createSecureAcls() {
+    // Read all to the world
+    List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
+    // Create/Delete/Write/Admin to creator
+    nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
+    return nodeAcls;
+  }
+
+  private void startListeningForQueries() {
+    PathChildrenCacheListener listener = (client, pathChildrenCacheEvent) -> {
+      if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
+        KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace,
+            ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath()));
+        Optional<KillQueryZookeeperData> data = barrier.getKillQueryData();
+        if (!data.isPresent()) {
+          return;
+        }
+        KillQueryZookeeperData killQuery = data.get();
+        LOG.debug("Kill query request with id {}", killQuery.getQueryId());
+        if (getServerHost().equals(killQuery.getRequestingServer())) {
+          // The listener was called for the server who posted the request
+          return;
+        }
+        if (localKillQueryImpl.isLocalQuery(killQuery.getQueryId())) {
+          LOG.info("Killing query with id {}", killQuery.getQueryId());
+          barrier.confirmProgress(getServerHost());
+          try {
+            localKillQueryImpl
+                .killLocalQuery(killQuery.getQueryId(), conf, killQuery.getDoAs(), killQuery.isDoAsAdmin());
+            barrier.confirmDone(getServerHost());
+          } catch (Exception e) {
+            LOG.error("Unable to kill local query", e);
+            barrier.confirmFailed(getServerHost());
+          }
+        } else {
+          LOG.debug("Confirm unknown kill query request with id {}", killQuery.getQueryId());
+          barrier.confirmNo(getServerHost());
+        }
+      }
+    };
+    LOG.info("Start to listen for kill query requests.");
+    killQueryListener.getListenable().addListener(listener);
+  }
+
+  @Override
+  public synchronized void stop() {
+    super.stop();
+    LOG.info("Stopping KillQueryZookeeperManager service.");
+    CloseableUtils.closeQuietly(killQueryListener);
+    CloseableUtils.closeQuietly(zooKeeperClient);
+  }
+
+  private List<String> getAllServerUrls() {
+    List<String> serverHosts = new ArrayList<>();
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && !conf
+        .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) {
+      String zooKeeperNamespace = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+      try {
+        serverHosts.addAll(zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace));
+      } catch (Exception e) {
+        LOG.error("Unable the get available server hosts", e);
+      }
+    }
+    return serverHosts;
+  }
+
+  private String getServerHost() {
+    if (hiveServer2 == null) {
+      return "";
+    }
+    try {
+      return removeDelimiter(hiveServer2.getServerInstanceURI());
+    } catch (Exception e) {
+      LOG.error("Unable to determine the server host", e);
+      return "";
+    }
+  }
+
+  // for debugging
+  private static class ZkConnectionStateListener implements ConnectionStateListener {
+    @Override
+    public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+      LOG.info("Connection state change notification received. State: {}", connectionState);
+    }
+  }
+
+  /**
+   * Post a kill query request on Zookeeper for the other HS2 instances. If the service discovery is not enabled or
+   * there is no other server registered, does nothing. Otherwise post the kill query request on Zookeeper
+   * and waits for the other instances to confirm the kill or deny it.
+   *
+   * @param queryIdOrTag queryId or tag to kill
+   * @param doAs         user requesting the kill
+   * @param doAsAdmin    admin user requesting the kill (with KILLQUERY privilege)
+   * @throws IOException If the kill query failed
+   */
+  public void killQuery(String queryIdOrTag, String doAs, boolean doAsAdmin) throws IOException {
+    List<String> serverHosts = getAllServerUrls();
+    if (serverHosts.size() < 2) {
+      return;
+    }
+    KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace);
+    boolean result;
+    try {
+      barrier.setBarrier(queryIdOrTag, hiveServer2.getServerInstanceURI(), doAs, doAsAdmin);
+      LOG.info("Created kill query barrier in path: {} for queryId: {}", barrier.getBarrierPath(), queryIdOrTag);
+      result = barrier.waitOnBarrier(serverHosts.size() - 1, MAX_WAIT_ON_CONFIRMATION_SECONDS,
+          MAX_WAIT_ON_KILL_SECONDS, TimeUnit.SECONDS);
+
+    } catch (Exception e) {
+      LOG.error("Unable to create Barrier on Zookeeper for KillQuery", e);
+      throw new IOException(e);
+    }
+    if (!result) {
+      throw new IOException("Unable to kill query on remote servers");
+    }
+  }
+
+  /**
+   * Data to post to Zookeeper for a kill query request. The fields will be serialized with ':' delimiter.
+   * In requestingServer every ':' will be escaped. Other fields can not contain any ':'.
+   */
+  public static class KillQueryZookeeperData {
+    private String queryId;
+    private String requestingServer;
+    private String doAs;
+    private boolean doAsAdmin;
+
+    public KillQueryZookeeperData(String queryId, String requestingServer, String doAs, boolean doAsAdmin) {
+      if (!StringUtils.equals(queryId, removeDelimiter(queryId))) {
+        throw new IllegalArgumentException("QueryId can not contain any ':' character.");
+      }
+      this.queryId = queryId;
+      this.requestingServer = removeDelimiter(requestingServer);
+      if (!StringUtils.equals(doAs, removeDelimiter(doAs))) {
+        throw new IllegalArgumentException("doAs can not contain any ':' character.");
+      }
+      this.doAs = doAs;
+      this.doAsAdmin = doAsAdmin;
+    }
+
+    public KillQueryZookeeperData(String data) {
+      if (data == null) {
+        return;
+      }
+
+      String[] elem = data.split(":");
+      queryId = elem[0];
+      requestingServer = elem[1];
+      doAs = elem[2];
+      doAsAdmin = Boolean.parseBoolean(elem[3]);
+    }
+
+    @Override
+    public String toString() {
+      return queryId + ":" + requestingServer + ":" + doAs + ":" + doAsAdmin;
+    }
+
+    public String getQueryId() {
+      return queryId;
+    }
+
+    public String getRequestingServer() {
+      return requestingServer;
+    }
+
+    public String getDoAs() {
+      return doAs;
+    }
+
+    public boolean isDoAsAdmin() {
+      return doAsAdmin;
+    }
+  }
+
+  /**
+   * Zookeeper Barrier for the KillQuery Operation.
+   * It post a kill query request on Zookeeper and waits until the given number of service instances responses.
+   * Implementation is based on org.apache.curator.framework.recipes.barriers.DistributedBarrier.
+   */
+  public static class KillQueryZookeeperBarrier {
+    private final CuratorFramework client;
+    private final String barrierPath;
+    private final Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        client.postSafeNotify(KillQueryZookeeperBarrier.this);
+      }
+    };
+
+    /**
+     * @param client          client
+     * @param barrierRootPath rootPath to put the barrier
+     */
+    public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath) {
+      this(client, barrierRootPath, UUID.randomUUID().toString());
+    }
+
+    /**
+     * @param client          client
+     * @param barrierRootPath rootPath to put the barrier
+     * @param barrierPath     name of the barrier
+     */
+    public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath, String barrierPath) {
+      this.client = client;
+      this.barrierPath = PathUtils.validatePath(barrierRootPath + "/" + barrierPath);
+    }
+
+    public String getBarrierPath() {
+      return barrierPath;
+    }
+
+    /**
+     * Utility to set the barrier node.
+     *
+     * @throws Exception errors
+     */
+    public synchronized void setBarrier(String queryId, String requestingServer, String doAs, boolean doAsAdmin)
+        throws Exception {
+      try {
+        KillQueryZookeeperData data = new KillQueryZookeeperData(queryId, requestingServer, doAs, doAsAdmin);
+        client.create().creatingParentContainersIfNeeded()
+            .forPath(barrierPath, data.toString().getBytes(StandardCharsets.UTF_8));
+      } catch (KeeperException.NodeExistsException e) {
+        throw new IllegalStateException("Barrier with this path already exists");
+      }
+    }
+
+    public synchronized Optional<KillQueryZookeeperData> getKillQueryData() throws Exception {
+      if (client.checkExists().forPath(barrierPath) != null) {
+        byte[] data = client.getData().forPath(barrierPath);
+        return Optional.of(new KillQueryZookeeperData(new String(data, StandardCharsets.UTF_8)));
+      }
+      return Optional.empty();
+    }
+
+    /**
+     * Confirm not knowing the query with the queryId in the barrier.
+     *
+     * @param serverId The serverHost confirming the request
+     * @throws Exception If confirmation failed
+     */
+    public synchronized void confirmNo(String serverId) throws Exception {
+      if (client.checkExists().forPath(barrierPath) != null) {
+        client.create().forPath(barrierPath + "/NO:" + serverId);
+      } else {
+        throw new IllegalStateException("Barrier is not initialised");
+      }
+    }
+
+    /**
+     * Confirm knowing the query with the queryId in the barrier and starting the kill query process.
+     *
+     * @param serverId The serverHost confirming the request
+     * @throws Exception If confirmation failed
+     */
+    public synchronized void confirmProgress(String serverId) throws Exception {
+      if (client.checkExists().forPath(barrierPath) != null) {
+        client.create().forPath(barrierPath + "/PROGRESS:" + serverId);
+      } else {
+        throw new IllegalStateException("Barrier is not initialised");
+      }
+    }
+
+    /**
+     * Confirm killing the query with the queryId in the barrier.
+     *
+     * @param serverId The serverHost confirming the request
+     * @throws Exception If confirmation failed
+     */
+    public synchronized void confirmDone(String serverId) throws Exception {
+      if (client.checkExists().forPath(barrierPath) != null) {
+        if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) {
+          client.delete().forPath(barrierPath + "/PROGRESS:" + serverId);
+        }
+        client.create().forPath(barrierPath + "/DONE:" + serverId);
+      } else {
+        throw new IllegalStateException("Barrier is not initialised");
+      }
+    }
+
+    /**
+     * Confirm failure of killing the query with the queryId in the barrier.
+     *
+     * @param serverId The serverHost confirming the request
+     * @throws Exception If confirmation failed
+     */
+    public synchronized void confirmFailed(String serverId) throws Exception {
+      if (client.checkExists().forPath(barrierPath) != null) {
+        if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) {
+          client.delete().forPath(barrierPath + "/PROGRESS:" + serverId);
+        }
+        client.create().forPath(barrierPath + "/FAILED:" + serverId);
+      } else {
+        throw new IllegalStateException("Barrier is not initialised");
+      }
+    }
+
+    /**
+     * Wait for every server either confirm killing the query or confirm not knowing the query.
+     *
+     * @param confirmationCount     number of confirmation to wait for
+     * @param maxWaitOnConfirmation confirmation waiting timeout for NO answers
+     * @param maxWaitOnKill         timeout for waiting on the actual kill query operation
+     * @param unit                  time unit for timeouts
+     * @return true if the kill was confirmed, false on timeout or if everybody voted for NO
+     * @throws Exception If confirmation failed
+     */
+    public synchronized boolean waitOnBarrier(int confirmationCount, long maxWaitOnConfirmation, long maxWaitOnKill,
+        TimeUnit unit) throws Exception {
+      long startMs = System.currentTimeMillis();
+      long startKill = -1;
+      long maxWaitMs = TimeUnit.MILLISECONDS.convert(maxWaitOnConfirmation, unit);
+      long maxWaitOnKillMs = TimeUnit.MILLISECONDS.convert(maxWaitOnKill, unit);
+
+      boolean progress = false;
+      boolean result = false;
+      while (true) {
+        List<String> children = client.getChildren().usingWatcher(watcher).forPath(barrierPath);
+        boolean concluded = false;
+        for (String child : children) {
+          if (child.startsWith("DONE")) {
+            result = true;
+            concluded = true;
+            break;
+          }
+          if (child.startsWith("FAILED")) {
+            concluded = true;
+            break;
+          }
+          if (child.startsWith("PROGRESS")) {
+            progress = true;
+          }
+        }
+        if (concluded) {
+          break;
+        }
+        if (progress) {
+          // Wait for the kill query to finish
+          if (startKill < 0) {
+            startKill = System.currentTimeMillis();
+          }
+          long elapsed = System.currentTimeMillis() - startKill;
+          long thisWaitMs = maxWaitOnKillMs - elapsed;
+          if (thisWaitMs <= 0) {
+            break;
+          }
+          wait(thisWaitMs);
+        } else {
+          if (children.size() == confirmationCount) {
+            result = false;
+            break;
+          }
+          // Wait for confirmation
+          long elapsed = System.currentTimeMillis() - startMs;
+          long thisWaitMs = maxWaitMs - elapsed;
+          if (thisWaitMs <= 0) {
+            break;
+          }
+          wait(thisWaitMs);
+        }
+
+      }
+      client.delete().deletingChildrenIfNeeded().forPath(barrierPath);
+      return result;
+    }
+  }
+
+  private static String removeDelimiter(String in) {
+    if (in == null) {
+      return null;
+    }
+    return in.replaceAll(":", "");
+  }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
index 71d8651..1e35795 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
@@ -29,6 +29,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.nodes.PersistentNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -337,6 +338,9 @@ public class ZooKeeperHiveHelper {
     }
     if (maxRetries > 0) {
       builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+    } else {
+      // Retry policy is mandatory
+      builder = builder.retryPolicy(new RetryOneTime(1000));
     }
     if (zooKeeperAclProvider != null) {
       builder = builder.aclProvider(zooKeeperAclProvider);