You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/08/13 05:46:58 UTC
hive git commit: HIVE-19924: Tag distcp jobs run by Repl Load (Mahesh
Kumar Behera, reviewed by Sankar Hariappan)
Repository: hive
Updated Branches:
refs/heads/master 4a30574d3 -> 250e10ecf
HIVE-19924: Tag distcp jobs run by Repl Load (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/250e10ec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/250e10ec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/250e10ec
Branch: refs/heads/master
Commit: 250e10ecf00e4b1e2536ca943be7e9068d699a6c
Parents: 4a30574
Author: Sankar Hariappan <sa...@apache.org>
Authored: Mon Aug 13 11:16:32 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Mon Aug 13 11:16:32 2018 +0530
----------------------------------------------------------------------
.../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 3 +-
.../org/apache/hive/jdbc/TestJdbcDriver2.java | 28 ++++
.../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 66 ----------
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 128 ++++++++++++++++++-
.../org/apache/hadoop/hive/ql/QueryState.java | 27 ++++
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/Task.java | 5 +
.../ql/exec/tez/KillTriggerActionHandler.java | 3 +-
.../hive/ql/exec/tez/WorkloadManager.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 58 +++++----
.../hadoop/hive/ql/session/KillQuery.java | 3 +-
.../hadoop/hive/ql/session/NullKillQuery.java | 3 +-
.../org/apache/hive/service/cli/CLIService.java | 2 +-
.../hive/service/cli/operation/Operation.java | 8 ++
.../service/cli/operation/OperationManager.java | 35 ++++-
.../service/cli/operation/SQLOperation.java | 4 +-
.../hive/service/cli/session/HiveSession.java | 2 +
.../service/cli/session/HiveSessionImpl.java | 5 +
.../hive/service/server/KillQueryImpl.java | 88 ++++++++++++-
19 files changed, 361 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 98f4729..d2e9514 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -110,7 +110,7 @@ public abstract class BaseJdbcWithMiniLlap {
private static Connection hs2Conn = null;
// This method should be called by sub-classes in a @BeforeClass initializer
- public static void beforeTest(HiveConf inputConf) throws Exception {
+ public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
conf = inputConf;
Class.forName(MiniHS2.getJdbcDriverName());
miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
@@ -120,6 +120,7 @@ public abstract class BaseJdbcWithMiniLlap {
Map<String, String> confOverlay = new HashMap<String, String>();
miniHS2.start(confOverlay);
miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+ return miniHS2;
}
static HiveConf defaultConf() throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 8f552b0..c2f5703 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -3036,6 +3036,34 @@ public class TestJdbcDriver2 {
}
}
+ @Test
+ public void testGetQueryId() throws Exception {
+ HiveStatement stmt = (HiveStatement) con.createStatement();
+ HiveStatement stmt1 = (HiveStatement) con.createStatement();
+ stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')");
+ String queryId = stmt.getQueryId();
+ assertFalse(queryId.isEmpty());
+ stmt.getUpdateCount();
+
+ stmt1.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')");
+ String queryId1 = stmt1.getQueryId();
+ assertFalse("hiveCustomTag".equals(queryId1));
+ assertFalse(queryId.equals(queryId1));
+ assertFalse(queryId1.isEmpty());
+ stmt1.getUpdateCount();
+
+ stmt.executeAsync("select count(*) from " + dataTypeTableName);
+ queryId = stmt.getQueryId();
+ assertFalse("hiveCustomTag".equals(queryId));
+ assertFalse(queryId.isEmpty());
+ assertFalse(queryId.equals(queryId1));
+ stmt.getUpdateCount();
+
+ stmt.execute("drop database query_id_test");
+ stmt.close();
+ stmt1.close();
+ }
+
// Test that opening a JDBC connection to a non-existent database throws a HiveSQLException
@Test(expected = HiveSQLException.class)
public void testConnectInvalidDatabase() throws SQLException {
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 2139709..5cb0a88 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -21,7 +21,6 @@ package org.apache.hive.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1477,71 +1476,6 @@ public class TestJdbcWithMiniHS2 {
}
}
- /**
- * Test CLI kill command of a query that is running.
- * We spawn 2 threads - one running the query and
- * the other attempting to cancel.
- * We're using a dummy udf to simulate a query,
- * that runs for a sufficiently long time.
- * @throws Exception
- */
- @Test
- public void testKillQuery() throws Exception {
- Connection con = conTestDb;
- Connection con2 = getConnection(testDbName);
-
- String udfName = SleepMsUDF.class.getName();
- Statement stmt1 = con.createStatement();
- final Statement stmt2 = con2.createStatement();
- stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
- stmt1.close();
- final Statement stmt = con.createStatement();
- final ExceptionHolder tExecuteHolder = new ExceptionHolder();
- final ExceptionHolder tKillHolder = new ExceptionHolder();
-
- // Thread executing the query
- Thread tExecute = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("Executing query: ");
- // The test table has 500 rows, so total query time should be ~ 500*500ms
- stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " +
- "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col");
- fail("Expecting SQLException");
- } catch (SQLException e) {
- tExecuteHolder.throwable = e;
- }
- }
- });
- // Thread killing the query
- Thread tKill = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- String queryId = ((HiveStatement) stmt).getQueryId();
- System.out.println("Killing query: " + queryId);
-
- stmt2.execute("kill query '" + queryId + "'");
- stmt2.close();
- } catch (Exception e) {
- tKillHolder.throwable = e;
- }
- }
- });
-
- tExecute.start();
- tKill.start();
- tExecute.join();
- tKill.join();
- stmt.close();
- con2.close();
-
- assertNotNull("tExecute", tExecuteHolder.throwable);
- assertNull("tCancel", tKillHolder.throwable);
- }
-
private static class ExceptionHolder {
Throwable throwable;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index c02980b..4942ed9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -30,21 +30,55 @@ import org.apache.hadoop.io.NullWritable;
import org.junit.BeforeClass;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Connection;
+import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
/**
* TestJdbcWithMiniLlap for Arrow format
*/
public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+ private static MiniHS2 miniHS2 = null;
+ private static final String tableName = "testJdbcMinihs2Tbl";
+ private static String dataFileDir;
+ private static final String testDbName = "testJdbcMinihs2";
+ private static class ExceptionHolder {
+ Throwable throwable;
+ }
@BeforeClass
public static void beforeTest() throws Exception {
HiveConf conf = defaultConf();
conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
- BaseJdbcWithMiniLlap.beforeTest(conf);
+ MiniHS2.cleanupLocalDir();
+ miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
+ dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+
+ Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
+ System.getProperty("user.name"), "bar");
+ Statement stmt = conDefault.createStatement();
+ stmt.execute("drop database if exists " + testDbName + " cascade");
+ stmt.execute("create database " + testDbName);
+ stmt.close();
+ conDefault.close();
+ }
+
+ @AfterClass
+ public static void afterTest() {
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
}
@Override
@@ -230,5 +264,95 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
}
+ /**
+ * SleepMsUDF
+ */
+ public static class SleepMsUDF extends UDF {
+ public Integer evaluate(int value, int ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // No-op
+ }
+ return value;
+ }
+ }
+
+ /**
+ * Test CLI kill command of a query that is running.
+ * We spawn 2 threads - one running the query and
+ * the other attempting to cancel.
+ * We're using a dummy udf to simulate a query,
+ * that runs for a sufficiently long time.
+ * @throws Exception
+ */
+ @Test
+ public void testKillQuery() throws Exception {
+ Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+ System.getProperty("user.name"), "bar");
+ Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+ System.getProperty("user.name"), "bar");
+
+ String udfName = SleepMsUDF.class.getName();
+ Statement stmt1 = con.createStatement();
+ final Statement stmt2 = con2.createStatement();
+ Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+
+ String tblName = testDbName + "." + tableName;
+
+ stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
+ stmt1.execute("create table " + tblName + " (int_col int, value string) ");
+ stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName);
+
+
+ stmt1.close();
+ final Statement stmt = con.createStatement();
+ final ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ final ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ // Thread executing the query
+ Thread tExecute = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ System.out.println("Executing query: ");
+ stmt.execute("set hive.llap.execution.mode = none");
+
+ // The test table has 500 rows, so total query time should be ~ 500*500ms
+ stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " +
+ "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col");
+ } catch (SQLException e) {
+ tExecuteHolder.throwable = e;
+ }
+ }
+ });
+ // Thread killing the query
+ Thread tKill = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(5000);
+ String queryId = ((HiveStatement) stmt).getQueryId();
+ System.out.println("Killing query: " + queryId);
+ stmt2.execute("kill query '" + queryId + "'");
+ stmt2.close();
+ } catch (Exception e) {
+ tKillHolder.throwable = e;
+ }
+ }
+ });
+
+ tExecute.start();
+ tKill.start();
+ tExecute.join();
+ tKill.join();
+ stmt.close();
+ con2.close();
+ con.close();
+
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index b1a602c..028dd60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* The class to store query level info such as queryId. Multiple queries can run
@@ -54,6 +55,10 @@ public class QueryState {
*/
private long numModifiedRows = 0;
+ // Holds the tag supplied by user to uniquely identify the query. Can be used to kill the query if the query
+ // id cannot be queried for some reason like hive server restart.
+ private String queryTag = null;
+
/**
* Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
@@ -62,6 +67,7 @@ public class QueryState {
this.queryConf = conf;
}
+ // Get the query id stored in query specific config.
public String getQueryId() {
return (queryConf.getVar(HiveConf.ConfVars.HIVEQUERYID));
}
@@ -112,6 +118,25 @@ public class QueryState {
public void setNumModifiedRows(long numModifiedRows) {
this.numModifiedRows = numModifiedRows;
}
+
+ public String getQueryTag() {
+ return queryTag;
+ }
+
+ public void setQueryTag(String queryTag) {
+ this.queryTag = queryTag;
+ }
+
+ public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
+ String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
+ if (jobTag == null) {
+ jobTag = queryTag;
+ } else {
+ jobTag = jobTag.concat("," + queryTag);
+ }
+ queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+ }
+
/**
* Builder to instantiate the QueryState object.
*/
@@ -221,6 +246,8 @@ public class QueryState {
if (generateNewQueryId) {
String queryId = QueryPlan.makeQueryId();
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
+ setMapReduceJobTag(queryConf, queryId);
+
// FIXME: druid storage handler relies on query.id to maintain some staging directories
// expose queryid to session level
if (hiveConf != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index accd7f1..467f728 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3296,7 +3296,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
private int killQuery(Hive db, KillQueryDesc desc) throws HiveException {
SessionState sessionState = SessionState.get();
for (String queryId : desc.getQueryIds()) {
- sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY");
+ sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", db.getConf());
}
LOG.info("kill query called ({})", desc.getQueryIds());
return 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 240208a..11ef62c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,6 +203,10 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
+
+ if (conf != null) {
+ LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS));
+ }
int retval = execute(driverContext);
this.setDone();
if (hiveHistory != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 50d234d..f357775 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -42,7 +42,8 @@ public class KillTriggerActionHandler implements TriggerActionHandler<TezSession
KillQuery killQuery = sessionState.getKillQuery();
// if kill query is null then session might have been released to pool or closed already
if (killQuery != null) {
- sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
+ sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(),
+ sessionState.getConf());
}
} catch (HiveException e) {
LOG.warn("Unable to kill query {} for trigger violation");
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 7137a17..5326e35 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -438,7 +438,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
try {
- kq.killQuery(queryId, reason);
+ kq.killQuery(queryId, reason, toKill.getConf());
addKillQueryResult(toKill, true);
killCtx.killSessionFuture.set(true);
wmEvent.endEvent(toKill);
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index adaa3d3..e4186c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
@@ -228,20 +230,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
break;
case TOK_REPL_CONFIG:
- Map<String, String> replConfigs
- = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
- if (null != replConfigs) {
- for (Map.Entry<String, String> config : replConfigs.entrySet()) {
- conf.set(config.getKey(), config.getValue());
- }
-
- // As hive conf is changed, need to get the Hive DB again with it.
- try {
- db = Hive.get(conf);
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
- }
+ setConfigs((ASTNode) childNode.getChild(0));
break;
default:
throw new SemanticException("Unrecognized token in REPL LOAD statement");
@@ -360,6 +349,32 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ private void setConfigs(ASTNode node) throws SemanticException {
+ Map<String, String> replConfigs = DDLSemanticAnalyzer.getProps(node);
+ if (null != replConfigs) {
+ for (Map.Entry<String, String> config : replConfigs.entrySet()) {
+ String key = config.getKey();
+ // don't set the query id in the config
+ if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
+ String queryTag = config.getValue();
+ if (!StringUtils.isEmpty(queryTag)) {
+ QueryState.setMapReduceJobTag(conf, queryTag);
+ }
+ queryState.setQueryTag(queryTag);
+ } else {
+ conf.set(key, config.getValue());
+ }
+ }
+
+ // As hive conf is changed, need to get the Hive DB again with it.
+ try {
+ db = Hive.get(conf);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+
// REPL STATUS
private void initReplStatus(ASTNode ast) throws SemanticException{
dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
@@ -371,20 +386,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
break;
case TOK_REPL_CONFIG:
- Map<String, String> replConfigs
- = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
- if (null != replConfigs) {
- for (Map.Entry<String, String> config : replConfigs.entrySet()) {
- conf.set(config.getKey(), config.getValue());
- }
-
- // As hive conf is changed, need to get the Hive DB again with it.
- try {
- db = Hive.get(conf);
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
- }
+ setConfigs((ASTNode) childNode.getChild(0));
break;
default:
throw new SemanticException("Unrecognized token in REPL STATUS statement");
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
index 2e183dc..01dc7e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hive.ql.session;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
public interface KillQuery {
- void killQuery(String queryId, String errMsg) throws HiveException;
+ void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
index b62f22c..eac2936 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
@@ -18,11 +18,12 @@
package org.apache.hadoop.hive.ql.session;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
public class NullKillQuery implements KillQuery {
@Override
- public void killQuery(String queryId, String errMsg) throws HiveException {
+ public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException {
// Do nothing
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index e28e513..9cbe7e1 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -624,7 +624,7 @@ public class CLIService extends CompositeService implements ICLIService {
public String getQueryId(TOperationHandle opHandle) throws HiveSQLException {
Operation operation = sessionManager.getOperationManager().getOperation(
new OperationHandle(opHandle));
- final String queryId = operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
+ final String queryId = operation.getQueryId();
LOG.debug(opHandle + ": getQueryId() " + queryId);
return queryId;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 1ee0756..4b9cbd3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -413,4 +413,12 @@ public abstract class Operation {
protected void markOperationCompletedTime() {
operationComplete = System.currentTimeMillis();
}
+
+ public String getQueryTag() {
+ return queryState.getQueryTag();
+ }
+
+ public String getQueryId() {
+ return queryState.getQueryId();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 5336034..8db6a29 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -62,6 +62,7 @@ public class OperationManager extends AbstractService {
new ConcurrentHashMap<OperationHandle, Operation>();
private final ConcurrentHashMap<String, Operation> queryIdOperation =
new ConcurrentHashMap<String, Operation>();
+ private final ConcurrentHashMap<String, String> queryTagToIdMap = new ConcurrentHashMap<>();
//Following fields for displaying queries on WebUI
private Object webuiLock = new Object();
@@ -201,11 +202,32 @@ public class OperationManager extends AbstractService {
}
}
+ public void updateQueryTag(String queryId, String queryTag) {
+ Operation operation = queryIdOperation.get(queryId);
+ if (operation != null) {
+ String queryIdTemp = queryTagToIdMap.get(queryTag);
+ if (queryIdTemp != null) {
+ throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp);
+ }
+ queryTagToIdMap.put(queryTag, queryId);
+ LOG.info("Query " + queryId + " is updated with tag " + queryTag);
+ return;
+ }
+ LOG.info("Query id is missing during query tag updation");
+ }
+
private Operation removeOperation(OperationHandle opHandle) {
Operation operation = handleToOperation.remove(opHandle);
+ if (operation == null) {
+ throw new RuntimeException("Operation does not exist: " + opHandle);
+ }
String queryId = getQueryId(operation);
queryIdOperation.remove(queryId);
- LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
+ String queryTag = operation.getQueryTag();
+ if (queryTag != null) {
+ queryTagToIdMap.remove(queryTag);
+ }
+ LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag);
if (operation instanceof SQLOperation) {
removeSafeQueryInfo(opHandle);
}
@@ -285,9 +307,6 @@ public class OperationManager extends AbstractService {
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
LOG.info("Closing operation: " + opHandle);
Operation operation = removeOperation(opHandle);
- if (operation == null) {
- throw new HiveSQLException("Operation does not exist: " + opHandle);
- }
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
try {
@@ -422,4 +441,12 @@ public class OperationManager extends AbstractService {
public Operation getOperationByQueryId(String queryId) {
return queryIdOperation.get(queryId);
}
+
+ public Operation getOperationByQueryTag(String queryTag) {
+ String queryId = queryTagToIdMap.get(queryTag);
+ if (queryId != null) {
+ return getOperationByQueryId(queryId);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 9a07fa1..36df57e 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -198,7 +198,9 @@ public class SQLOperation extends ExecuteStatementOperation {
if (0 != response.getResponseCode()) {
throw toSQLException("Error while compiling statement", response);
}
-
+ if (queryState.getQueryTag() != null && queryState.getQueryId() != null) {
+ parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag());
+ }
setHasResultSet(driver.hasResultSet());
} catch (HiveSQLException e) {
setState(OperationState.ERROR);
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index b4070ce..cce9c22 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -200,6 +200,8 @@ public interface HiveSession extends HiveSessionBase {
void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+ void updateQueryTag(String queryId, String queryTag) throws HiveSQLException;
+
void closeOperation(OperationHandle opHandle) throws HiveSQLException;
TableSchema getResultSetMetadata(OperationHandle opHandle)
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b9a8537..e5cdc7b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -874,6 +874,11 @@ public class HiveSessionImpl implements HiveSession {
}
@Override
+ public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException {
+ sessionManager.getOperationManager().updateQueryTag(queryId, queryTag);
+ }
+
+ @Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
acquire(true, false);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index b39a7b1..490a04d 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -18,8 +18,20 @@
package org.apache.hive.service.server;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.operation.Operation;
@@ -27,6 +39,12 @@ import org.apache.hive.service.cli.operation.OperationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
public class KillQueryImpl implements KillQuery {
private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
@@ -36,18 +54,82 @@ public class KillQueryImpl implements KillQuery {
this.operationManager = operationManager;
}
+ public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException {
+ Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+ gar.setScope(ApplicationsRequestScope.OWN);
+ gar.setApplicationTags(Collections.singleton(tag));
+
+ ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
+ GetApplicationsResponse apps = proxy.getApplications(gar);
+ List<ApplicationReport> appsList = apps.getApplicationList();
+ for(ApplicationReport appReport : appsList) {
+ childYarnJobs.add(appReport.getApplicationId());
+ }
+
+ if (childYarnJobs.isEmpty()) {
+ LOG.info("No child applications found");
+ } else {
+ LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ","));
+ }
+
+ return childYarnJobs;
+ }
+
+ public static void killChildYarnJobs(Configuration conf, String tag) {
+ try {
+ if (tag == null) {
+ return;
+ }
+ Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
+ if (!childYarnJobs.isEmpty()) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ for (ApplicationId app : childYarnJobs) {
+ yarnClient.killApplication(app);
+ }
+ }
+ } catch (IOException | YarnException ye) {
+ throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+ }
+ }
+
@Override
- public void killQuery(String queryId, String errMsg) throws HiveException {
+ public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException {
try {
+ String queryTag = null;
+
Operation operation = operationManager.getOperationByQueryId(queryId);
if (operation == null) {
- LOG.info("Query not found: " + queryId);
+ // Check if user has passed the query tag to kill the operation. This is possible if the application
+ // restarts and it does not have the proper query id. The tag can be used in that case to kill the query.
+ operation = operationManager.getOperationByQueryTag(queryId);
+ if (operation == null) {
+ LOG.info("Query not found: " + queryId);
+ }
} else {
+ // This is the normal flow, where the query is tagged and user wants to kill the query using the query id.
+ queryTag = operation.getQueryTag();
+ }
+
+ if (queryTag == null) {
+ //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are
+ //tagged with query id by default. This will cover the case where the application after restarts wants to kill
+ //the yarn jobs with query tag. The query tag can be passed as query id.
+ queryTag = queryId;
+ }
+
+ LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag);
+ killChildYarnJobs(conf, queryTag);
+
+ if (operation != null) {
OperationHandle handle = operation.getHandle();
operationManager.cancelOperation(handle, errMsg);
}
} catch (HiveSQLException e) {
- throw new HiveException(e);
+ LOG.error("Kill query failed for query " + queryId, e);
+ throw new HiveException(e.getMessage(), e);
}
}
}