You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/08/13 05:46:58 UTC

hive git commit: HIVE-19924: Tag distcp jobs run by Repl Load (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/master 4a30574d3 -> 250e10ecf


HIVE-19924: Tag distcp jobs run by Repl Load (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/250e10ec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/250e10ec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/250e10ec

Branch: refs/heads/master
Commit: 250e10ecf00e4b1e2536ca943be7e9068d699a6c
Parents: 4a30574
Author: Sankar Hariappan <sa...@apache.org>
Authored: Mon Aug 13 11:16:32 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Mon Aug 13 11:16:32 2018 +0530

----------------------------------------------------------------------
 .../apache/hive/jdbc/BaseJdbcWithMiniLlap.java  |   3 +-
 .../org/apache/hive/jdbc/TestJdbcDriver2.java   |  28 ++++
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   |  66 ----------
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java    | 128 ++++++++++++++++++-
 .../org/apache/hadoop/hive/ql/QueryState.java   |  27 ++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../org/apache/hadoop/hive/ql/exec/Task.java    |   5 +
 .../ql/exec/tez/KillTriggerActionHandler.java   |   3 +-
 .../hive/ql/exec/tez/WorkloadManager.java       |   2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  58 +++++----
 .../hadoop/hive/ql/session/KillQuery.java       |   3 +-
 .../hadoop/hive/ql/session/NullKillQuery.java   |   3 +-
 .../org/apache/hive/service/cli/CLIService.java |   2 +-
 .../hive/service/cli/operation/Operation.java   |   8 ++
 .../service/cli/operation/OperationManager.java |  35 ++++-
 .../service/cli/operation/SQLOperation.java     |   4 +-
 .../hive/service/cli/session/HiveSession.java   |   2 +
 .../service/cli/session/HiveSessionImpl.java    |   5 +
 .../hive/service/server/KillQueryImpl.java      |  88 ++++++++++++-
 19 files changed, 361 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 98f4729..d2e9514 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -110,7 +110,7 @@ public abstract class BaseJdbcWithMiniLlap {
   private static Connection hs2Conn = null;
 
   // This method should be called by sub-classes in a @BeforeClass initializer
-  public static void beforeTest(HiveConf inputConf) throws Exception {
+  public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
     conf = inputConf;
     Class.forName(MiniHS2.getJdbcDriverName());
     miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
@@ -120,6 +120,7 @@ public abstract class BaseJdbcWithMiniLlap {
     Map<String, String> confOverlay = new HashMap<String, String>();
     miniHS2.start(confOverlay);
     miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+    return miniHS2;
   }
 
   static HiveConf defaultConf() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 8f552b0..c2f5703 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -3036,6 +3036,34 @@ public class TestJdbcDriver2 {
     }
   }
 
+  @Test
+  public void testGetQueryId() throws Exception {
+    HiveStatement stmt = (HiveStatement) con.createStatement();
+    HiveStatement stmt1 = (HiveStatement) con.createStatement();
+    stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')");
+    String queryId = stmt.getQueryId();
+    assertFalse(queryId.isEmpty());
+    stmt.getUpdateCount();
+
+    stmt1.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')");
+    String queryId1 = stmt1.getQueryId();
+    assertFalse("hiveCustomTag".equals(queryId1));
+    assertFalse(queryId.equals(queryId1));
+    assertFalse(queryId1.isEmpty());
+    stmt1.getUpdateCount();
+
+    stmt.executeAsync("select count(*) from " + dataTypeTableName);
+    queryId = stmt.getQueryId();
+    assertFalse("hiveCustomTag".equals(queryId));
+    assertFalse(queryId.isEmpty());
+    assertFalse(queryId.equals(queryId1));
+    stmt.getUpdateCount();
+
+    stmt.execute("drop database query_id_test");
+    stmt.close();
+    stmt1.close();
+  }
+
   // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException
   @Test(expected = HiveSQLException.class)
   public void testConnectInvalidDatabase() throws SQLException {

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 2139709..5cb0a88 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -21,7 +21,6 @@ package org.apache.hive.jdbc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1477,71 +1476,6 @@ public class TestJdbcWithMiniHS2 {
     }
   }
 
-  /**
-   * Test CLI kill command of a query that is running.
-   * We spawn 2 threads - one running the query and
-   * the other attempting to cancel.
-   * We're using a dummy udf to simulate a query,
-   * that runs for a sufficiently long time.
-   * @throws Exception
-   */
-  @Test
-  public void testKillQuery() throws Exception {
-    Connection con = conTestDb;
-    Connection con2 = getConnection(testDbName);
-
-    String udfName = SleepMsUDF.class.getName();
-    Statement stmt1 = con.createStatement();
-    final Statement stmt2 = con2.createStatement();
-    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
-    stmt1.close();
-    final Statement stmt = con.createStatement();
-    final ExceptionHolder tExecuteHolder = new ExceptionHolder();
-    final ExceptionHolder tKillHolder = new ExceptionHolder();
-
-    // Thread executing the query
-    Thread tExecute = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          System.out.println("Executing query: ");
-          // The test table has 500 rows, so total query time should be ~ 500*500ms
-          stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " +
-              "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col");
-          fail("Expecting SQLException");
-        } catch (SQLException e) {
-          tExecuteHolder.throwable = e;
-        }
-      }
-    });
-    // Thread killing the query
-    Thread tKill = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(2000);
-          String queryId = ((HiveStatement) stmt).getQueryId();
-          System.out.println("Killing query: " + queryId);
-
-          stmt2.execute("kill query '" + queryId + "'");
-          stmt2.close();
-        } catch (Exception e) {
-          tKillHolder.throwable = e;
-        }
-      }
-    });
-
-    tExecute.start();
-    tKill.start();
-    tExecute.join();
-    tKill.join();
-    stmt.close();
-    con2.close();
-
-    assertNotNull("tExecute", tExecuteHolder.throwable);
-    assertNull("tCancel", tKillHolder.throwable);
-  }
-
   private static class ExceptionHolder {
     Throwable throwable;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index c02980b..4942ed9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -30,21 +30,55 @@ import org.apache.hadoop.io.NullWritable;
 import org.junit.BeforeClass;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Connection;
+import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 /**
  * TestJdbcWithMiniLlap for Arrow format
  */
 public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+  private static MiniHS2 miniHS2 = null;
+  private static final String tableName = "testJdbcMinihs2Tbl";
+  private static String dataFileDir;
+  private static final String testDbName = "testJdbcMinihs2";
 
+  private static class ExceptionHolder {
+    Throwable throwable;
+  }
 
   @BeforeClass
   public static void beforeTest() throws Exception {
     HiveConf conf = defaultConf();
     conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
-    BaseJdbcWithMiniLlap.beforeTest(conf);
+    MiniHS2.cleanupLocalDir();
+    miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
+    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+
+    Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
+            System.getProperty("user.name"), "bar");
+    Statement stmt = conDefault.createStatement();
+    stmt.execute("drop database if exists " + testDbName + " cascade");
+    stmt.execute("create database " + testDbName);
+    stmt.close();
+    conDefault.close();
+  }
+
+  @AfterClass
+  public static void afterTest() {
+    if (miniHS2 != null && miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
   }
 
   @Override
@@ -230,5 +264,95 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
   }
 
+  /**
+   * SleepMsUDF
+   */
+  public static class SleepMsUDF extends UDF {
+    public Integer evaluate(int value, int ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // No-op
+      }
+      return value;
+    }
+  }
+
+  /**
+   * Test CLI kill command of a query that is running.
+   * We spawn 2 threads - one running the query and
+   * the other attempting to cancel.
+   * We're using a dummy udf to simulate a query,
+   * that runs for a sufficiently long time.
+   * @throws Exception
+   */
+  @Test
+  public void testKillQuery() throws Exception {
+    Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+            System.getProperty("user.name"), "bar");
+    Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+            System.getProperty("user.name"), "bar");
+
+    String udfName = SleepMsUDF.class.getName();
+    Statement stmt1 = con.createStatement();
+    final Statement stmt2 = con2.createStatement();
+    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+
+    String tblName = testDbName + "." + tableName;
+
+    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
+    stmt1.execute("create table " + tblName + " (int_col int, value string) ");
+    stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName);
+
+
+    stmt1.close();
+    final Statement stmt = con.createStatement();
+    final ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    final ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    // Thread executing the query
+    Thread tExecute = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          System.out.println("Executing query: ");
+          stmt.execute("set hive.llap.execution.mode = none");
+
+          // The test table has 500 rows, so total query time should be ~ 500*500ms
+          stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " +
+                  "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col");
+        } catch (SQLException e) {
+          tExecuteHolder.throwable = e;
+        }
+      }
+    });
+    // Thread killing the query
+    Thread tKill = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(5000);
+          String queryId = ((HiveStatement) stmt).getQueryId();
+          System.out.println("Killing query: " + queryId);
+          stmt2.execute("kill query '" + queryId + "'");
+          stmt2.close();
+        } catch (Exception e) {
+          tKillHolder.throwable = e;
+        }
+      }
+    });
+
+    tExecute.start();
+    tKill.start();
+    tExecute.join();
+    tKill.join();
+    stmt.close();
+    con2.close();
+    con.close();
+
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index b1a602c..028dd60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 
 /**
  * The class to store query level info such as queryId. Multiple queries can run
@@ -54,6 +55,10 @@ public class QueryState {
    */
   private long numModifiedRows = 0;
 
+  // Holds the tag supplied by user to uniquely identify the query. Can be used to kill the query if the query
+  // id cannot be queried for some reason like hive server restart.
+  private String queryTag = null;
+
   /**
    * Private constructor, use QueryState.Builder instead.
    * @param conf The query specific configuration object
@@ -62,6 +67,7 @@ public class QueryState {
     this.queryConf = conf;
   }
 
+  // Get the query id stored in query specific config.
   public String getQueryId() {
     return (queryConf.getVar(HiveConf.ConfVars.HIVEQUERYID));
   }
@@ -112,6 +118,25 @@ public class QueryState {
   public void setNumModifiedRows(long numModifiedRows) {
     this.numModifiedRows = numModifiedRows;
   }
+
+  public String getQueryTag() {
+    return queryTag;
+  }
+
+  public void setQueryTag(String queryTag) {
+    this.queryTag = queryTag;
+  }
+
+  public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
+    String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
+    if (jobTag == null) {
+      jobTag = queryTag;
+    } else {
+      jobTag = jobTag.concat("," + queryTag);
+    }
+    queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+  }
+
   /**
    * Builder to instantiate the QueryState object.
    */
@@ -221,6 +246,8 @@ public class QueryState {
       if (generateNewQueryId) {
         String queryId = QueryPlan.makeQueryId();
         queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
+        setMapReduceJobTag(queryConf, queryId);
+
         // FIXME: druid storage handler relies on query.id to maintain some staging directories
         // expose queryid to session level
         if (hiveConf != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index accd7f1..467f728 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3296,7 +3296,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
   private int killQuery(Hive db, KillQueryDesc desc) throws HiveException {
     SessionState sessionState = SessionState.get();
     for (String queryId : desc.getQueryIds()) {
-      sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY");
+      sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", db.getConf());
     }
     LOG.info("kill query called ({})", desc.getQueryIds());
     return 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 240208a..11ef62c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,6 +203,10 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
       if (hiveHistory != null) {
         hiveHistory.logPlanProgress(queryPlan);
       }
+
+      if (conf != null) {
+        LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS));
+      }
       int retval = execute(driverContext);
       this.setDone();
       if (hiveHistory != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 50d234d..f357775 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -42,7 +42,8 @@ public class KillTriggerActionHandler implements TriggerActionHandler<TezSession
             KillQuery killQuery = sessionState.getKillQuery();
             // if kill query is null then session might have been released to pool or closed already
             if (killQuery != null) {
-              sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
+              sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(),
+                      sessionState.getConf());
             }
           } catch (HiveException e) {
             LOG.warn("Unable to kill query {} for trigger violation");

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 7137a17..5326e35 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -438,7 +438,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
             WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
             LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
             try {
-              kq.killQuery(queryId, reason);
+              kq.killQuery(queryId, reason, toKill.getConf());
               addKillQueryResult(toKill, true);
               killCtx.killSessionFuture.set(true);
               wmEvent.endEvent(toKill);

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index adaa3d3..e4186c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
@@ -228,20 +230,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
           break;
         case TOK_REPL_CONFIG:
-          Map<String, String> replConfigs
-                  = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
-          if (null != replConfigs) {
-            for (Map.Entry<String, String> config : replConfigs.entrySet()) {
-              conf.set(config.getKey(), config.getValue());
-            }
-
-            // As hive conf is changed, need to get the Hive DB again with it.
-            try {
-              db = Hive.get(conf);
-            } catch (HiveException e) {
-              throw new SemanticException(e);
-            }
-          }
+          setConfigs((ASTNode) childNode.getChild(0));
           break;
         default:
           throw new SemanticException("Unrecognized token in REPL LOAD statement");
@@ -360,6 +349,32 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
+  private void setConfigs(ASTNode node) throws SemanticException {
+    Map<String, String> replConfigs = DDLSemanticAnalyzer.getProps(node);
+    if (null != replConfigs) {
+      for (Map.Entry<String, String> config : replConfigs.entrySet()) {
+        String key = config.getKey();
+        // don't set the query id in the config
+        if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
+          String queryTag = config.getValue();
+          if (!StringUtils.isEmpty(queryTag)) {
+            QueryState.setMapReduceJobTag(conf, queryTag);
+          }
+          queryState.setQueryTag(queryTag);
+        } else {
+          conf.set(key, config.getValue());
+        }
+      }
+
+      // As hive conf is changed, need to get the Hive DB again with it.
+      try {
+        db = Hive.get(conf);
+      } catch (HiveException e) {
+        throw new SemanticException(e);
+      }
+    }
+  }
+
   // REPL STATUS
   private void initReplStatus(ASTNode ast) throws SemanticException{
     dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
@@ -371,20 +386,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
         break;
       case TOK_REPL_CONFIG:
-        Map<String, String> replConfigs
-            = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
-        if (null != replConfigs) {
-          for (Map.Entry<String, String> config : replConfigs.entrySet()) {
-            conf.set(config.getKey(), config.getValue());
-          }
-
-          // As hive conf is changed, need to get the Hive DB again with it.
-          try {
-            db = Hive.get(conf);
-          } catch (HiveException e) {
-            throw new SemanticException(e);
-          }
-        }
+        setConfigs((ASTNode) childNode.getChild(0));
         break;
       default:
         throw new SemanticException("Unrecognized token in REPL STATUS statement");

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
index 2e183dc..01dc7e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hive.ql.session;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 public interface KillQuery {
-  void killQuery(String queryId, String errMsg) throws HiveException;
+  void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
index b62f22c..eac2936 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
@@ -18,11 +18,12 @@
 
 package org.apache.hadoop.hive.ql.session;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 public class NullKillQuery implements KillQuery {
   @Override
-  public void killQuery(String queryId, String errMsg) throws HiveException {
+  public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException {
     // Do nothing
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index e28e513..9cbe7e1 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -624,7 +624,7 @@ public class CLIService extends CompositeService implements ICLIService {
   public String getQueryId(TOperationHandle opHandle) throws HiveSQLException {
     Operation operation = sessionManager.getOperationManager().getOperation(
         new OperationHandle(opHandle));
-    final String queryId = operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
+    final String queryId = operation.getQueryId();
     LOG.debug(opHandle + ": getQueryId() " + queryId);
     return queryId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 1ee0756..4b9cbd3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -413,4 +413,12 @@ public abstract class Operation {
   protected void markOperationCompletedTime() {
     operationComplete = System.currentTimeMillis();
   }
+
+  public String getQueryTag() {
+    return queryState.getQueryTag();
+  }
+
+  public String getQueryId() {
+    return queryState.getQueryId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 5336034..8db6a29 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -62,6 +62,7 @@ public class OperationManager extends AbstractService {
       new ConcurrentHashMap<OperationHandle, Operation>();
   private final ConcurrentHashMap<String, Operation> queryIdOperation =
       new ConcurrentHashMap<String, Operation>();
+  private final ConcurrentHashMap<String, String> queryTagToIdMap = new ConcurrentHashMap<>();
 
   //Following fields for displaying queries on WebUI
   private Object webuiLock = new Object();
@@ -201,11 +202,32 @@ public class OperationManager extends AbstractService {
     }
   }
 
+  public void updateQueryTag(String queryId, String queryTag) {
+    Operation operation = queryIdOperation.get(queryId);
+    if (operation != null) {
+      String queryIdTemp = queryTagToIdMap.get(queryTag);
+      if (queryIdTemp != null) {
+        throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp);
+      }
+      queryTagToIdMap.put(queryTag, queryId);
+      LOG.info("Query " + queryId + " is updated with tag " + queryTag);
+      return;
+    }
+    LOG.info("Query id is missing during query tag updation");
+  }
+
   private Operation removeOperation(OperationHandle opHandle) {
     Operation operation = handleToOperation.remove(opHandle);
+    if (operation == null) {
+      throw new RuntimeException("Operation does not exist: " + opHandle);
+    }
     String queryId = getQueryId(operation);
     queryIdOperation.remove(queryId);
-    LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
+    String queryTag = operation.getQueryTag();
+    if (queryTag != null) {
+      queryTagToIdMap.remove(queryTag);
+    }
+    LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag);
     if (operation instanceof SQLOperation) {
       removeSafeQueryInfo(opHandle);
     }
@@ -285,9 +307,6 @@ public class OperationManager extends AbstractService {
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
     LOG.info("Closing operation: " + opHandle);
     Operation operation = removeOperation(opHandle);
-    if (operation == null) {
-      throw new HiveSQLException("Operation does not exist: " + opHandle);
-    }
     Metrics metrics = MetricsFactory.getInstance();
     if (metrics != null) {
       try {
@@ -422,4 +441,12 @@ public class OperationManager extends AbstractService {
   public Operation getOperationByQueryId(String queryId) {
     return queryIdOperation.get(queryId);
   }
+
+  public Operation getOperationByQueryTag(String queryTag) {
+    String queryId = queryTagToIdMap.get(queryTag);
+    if (queryId != null) {
+      return getOperationByQueryId(queryId);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 9a07fa1..36df57e 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -198,7 +198,9 @@ public class SQLOperation extends ExecuteStatementOperation {
       if (0 != response.getResponseCode()) {
         throw toSQLException("Error while compiling statement", response);
       }
-
+      if (queryState.getQueryTag() != null && queryState.getQueryId() != null) {
+        parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag());
+      }
       setHasResultSet(driver.hasResultSet());
     } catch (HiveSQLException e) {
       setState(OperationState.ERROR);

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index b4070ce..cce9c22 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -200,6 +200,8 @@ public interface HiveSession extends HiveSessionBase {
 
   void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
 
+  void updateQueryTag(String queryId, String queryTag) throws HiveSQLException;
+
   void closeOperation(OperationHandle opHandle) throws HiveSQLException;
 
   TableSchema getResultSetMetadata(OperationHandle opHandle)

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b9a8537..e5cdc7b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -874,6 +874,11 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
+  public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException {
+    sessionManager.getOperationManager().updateQueryTag(queryId, queryTag);
+  }
+
+  @Override
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
     acquire(true, false);
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index b39a7b1..490a04d 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -18,8 +18,20 @@
 
 package org.apache.hive.service.server;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.operation.Operation;
@@ -27,6 +39,12 @@ import org.apache.hive.service.cli.operation.OperationManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 public class KillQueryImpl implements KillQuery {
   private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
 
@@ -36,18 +54,82 @@ public class KillQueryImpl implements KillQuery {
     this.operationManager = operationManager;
   }
 
+  public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException {
+    Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+    GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+    gar.setScope(ApplicationsRequestScope.OWN);
+    gar.setApplicationTags(Collections.singleton(tag));
+
+    ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
+    GetApplicationsResponse apps = proxy.getApplications(gar);
+    List<ApplicationReport> appsList = apps.getApplicationList();
+    for(ApplicationReport appReport : appsList) {
+      childYarnJobs.add(appReport.getApplicationId());
+    }
+
+    if (childYarnJobs.isEmpty()) {
+      LOG.info("No child applications found");
+    } else {
+      LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ","));
+    }
+
+    return childYarnJobs;
+  }
+
+  public static void killChildYarnJobs(Configuration conf, String tag) {
+    try {
+      if (tag == null) {
+        return;
+      }
+      Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
+      if (!childYarnJobs.isEmpty()) {
+        YarnClient yarnClient = YarnClient.createYarnClient();
+        yarnClient.init(conf);
+        yarnClient.start();
+        for (ApplicationId app : childYarnJobs) {
+          yarnClient.killApplication(app);
+        }
+      }
+    } catch (IOException | YarnException ye) {
+      throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+    }
+  }
+
   @Override
-  public void killQuery(String queryId, String errMsg) throws HiveException {
+  public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException {
     try {
+      String queryTag = null;
+
       Operation operation = operationManager.getOperationByQueryId(queryId);
       if (operation == null) {
-        LOG.info("Query not found: " + queryId);
+        // Check if user has passed the query tag to kill the operation. This is possible if the application
+        // restarts and it does not have the proper query id. The tag can be used in that case to kill the query.
+        operation = operationManager.getOperationByQueryTag(queryId);
+        if (operation == null) {
+          LOG.info("Query not found: " + queryId);
+        }
       } else {
+        // This is the normal flow, where the query is tagged and user wants to kill the query using the query id.
+        queryTag = operation.getQueryTag();
+      }
+
+      if (queryTag == null) {
+        //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are
+        //tagged with query id by default. This will cover the case where the application after restarts wants to kill
+        //the yarn jobs with query tag. The query tag can be passed as query id.
+        queryTag = queryId;
+      }
+
+      LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag);
+      killChildYarnJobs(conf, queryTag);
+
+      if (operation != null) {
         OperationHandle handle = operation.getHandle();
         operationManager.cancelOperation(handle, errMsg);
       }
     } catch (HiveSQLException e) {
-      throw new HiveException(e);
+      LOG.error("Kill query failed for query " + queryId, e);
+      throw new HiveException(e.getMessage(), e);
     }
   }
 }