You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/13 21:50:28 UTC
[2/2] hive git commit: HIVE-17809: Implement per pool trigger
validation and move sessions across pools (Prasanth Jayachandran reviewed by
Sergey Shelukhin)
HIVE-17809: Implement per pool trigger validation and move sessions across pools (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/51249505
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/51249505
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/51249505
Branch: refs/heads/master
Commit: 51249505c7b3db6bfa586cb1b6dee268f2ed6ce6
Parents: 7150f9c
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 13 13:48:19 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 13 13:48:19 2017 -0800
----------------------------------------------------------------------
data/conf/hive-log4j2.properties | 11 +-
.../hive/jdbc/AbstractJdbcTriggersTest.java | 16 +-
.../jdbc/TestTriggersMoveWorkloadManager.java | 154 ++++++++
.../hive/jdbc/TestTriggersNoTezSessionPool.java | 7 +-
.../jdbc/TestTriggersTezSessionPoolManager.java | 53 +--
.../hive/jdbc/TestTriggersWorkloadManager.java | 29 +-
.../exec/tez/KillMoveTriggerActionHandler.java | 83 +++++
.../ql/exec/tez/KillTriggerActionHandler.java | 10 +-
.../tez/PerPoolTriggerValidatorRunnable.java | 66 ++++
.../hive/ql/exec/tez/TezSessionPoolManager.java | 28 +-
.../hive/ql/exec/tez/TezSessionPoolSession.java | 12 +-
.../ql/exec/tez/TriggerValidatorRunnable.java | 57 ++-
.../exec/tez/TriggerViolationActionHandler.java | 48 ---
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 7 +-
.../hive/ql/exec/tez/WorkloadManager.java | 348 ++++++++++++-------
.../ql/exec/tez/WorkloadManagerFederation.java | 2 +-
.../org/apache/hadoop/hive/ql/wm/Action.java | 107 ++++++
.../hadoop/hive/ql/wm/CustomCounterLimit.java | 4 +-
.../hadoop/hive/ql/wm/ExecutionTrigger.java | 22 +-
.../apache/hadoop/hive/ql/wm/Expression.java | 4 +-
.../hadoop/hive/ql/wm/ExpressionFactory.java | 2 +-
.../hive/ql/wm/FileSystemCounterLimit.java | 6 +-
.../ql/wm/MetastoreGlobalTriggersFetcher.java | 20 +-
.../MetastoreResourcePlanTriggersFetcher.java | 38 --
.../hive/ql/wm/SessionTriggerProvider.java | 33 +-
.../hadoop/hive/ql/wm/TimeCounterLimit.java | 4 +-
.../org/apache/hadoop/hive/ql/wm/Trigger.java | 46 +--
.../hadoop/hive/ql/wm/TriggerActionHandler.java | 4 +-
.../hadoop/hive/ql/wm/TriggerContext.java | 12 +-
.../hadoop/hive/ql/wm/TriggerExpression.java | 2 +-
.../hadoop/hive/ql/wm/TriggersFetcher.java | 25 --
.../hadoop/hive/ql/wm/VertexCounterLimit.java | 4 +-
.../hive/ql/exec/tez/TestWorkloadManager.java | 239 ++++++++++++-
.../apache/hadoop/hive/ql/wm/TestTrigger.java | 36 +-
34 files changed, 1085 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/data/conf/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/data/conf/hive-log4j2.properties b/data/conf/hive-log4j2.properties
index 73c7d90..e5bb166 100644
--- a/data/conf/hive-log4j2.properties
+++ b/data/conf/hive-log4j2.properties
@@ -50,7 +50,7 @@ appender.DRFA.strategy.type = DefaultRolloverStrategy
appender.DRFA.strategy.max = 30
# list of all loggers
-loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp
+loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp, Thrift, Jetty, BlockStateChange
logger.HadoopIPC.name = org.apache.hadoop.ipc
logger.HadoopIPC.level = WARN
@@ -127,6 +127,15 @@ logger.AmazonAws.level = INFO
logger.ApacheHttp.name=org.apache.http
logger.ApacheHttp.level = INFO
+logger.Thrift.name = org.apache.thrift
+logger.Thrift.level = WARN
+
+logger.Jetty.name = org.eclipse.jetty
+logger.Jetty.level = WARN
+
+logger.BlockStateChange.name = BlockStateChange
+logger.BlockStateChange.level = WARN
+
# root logger
rootLogger.level = ${sys:hive.log.level}
rootLogger.appenderRefs = root, console
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 68d57ca..235e6c3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -102,7 +103,7 @@ public abstract class AbstractJdbcTriggersTest {
}
}
- private void createSleepUDF() throws SQLException {
+ void createSleepUDF() throws SQLException {
String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
Connection con = hs2Conn;
Statement stmt = con.createStatement();
@@ -110,7 +111,7 @@ public abstract class AbstractJdbcTriggersTest {
stmt.close();
}
- protected void runQueryWithTrigger(final String query, final List<String> setCmds,
+ void runQueryWithTrigger(final String query, final List<String> setCmds,
final String expect)
throws Exception {
@@ -149,7 +150,7 @@ public abstract class AbstractJdbcTriggersTest {
abstract void setupTriggers(final List<Trigger> triggers) throws Exception;
- protected List<String> getConfigs(String... more) {
+ List<String> getConfigs(String... more) {
List<String> setCmds = new ArrayList<>();
setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
setCmds.add("set mapred.min.split.size=100");
@@ -161,4 +162,11 @@ public abstract class AbstractJdbcTriggersTest {
}
return setCmds;
}
+
+ WMTrigger wmTriggerFromTrigger(Trigger trigger) {
+ WMTrigger result = new WMTrigger("rp", trigger.getName());
+ result.setTriggerExpression(trigger.getExpression().toString());
+ result.setActionExpression(trigger.getAction().toString());
+ return result;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
new file mode 100644
index 0000000..a983855
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan;
+import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.wm.Action;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.Expression;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+
+ String confDir = "../../data/conf/llap/";
+ HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+
+ conf = new HiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default");
+ conf.setBoolean("hive.test.workload.management", true);
+ conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
+ conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+ // don't want cache hits from llap io for testing filesystem bytes read counters
+ conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
+
+ conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ + "/tez-site.xml"));
+
+ miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+ dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+ Map<String, String> confOverlay = new HashMap<>();
+ miniHS2.start(confOverlay);
+ miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+ }
+
+ @Test(timeout = 60000)
+ public void testTriggerMoveAndKill() throws Exception {
+ Expression moveExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
+ Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000");
+ Trigger moveTrigger = new ExecutionTrigger("slow_query_move", moveExpression,
+ new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+ Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+ new Action(Action.Type.KILL_QUERY));
+ setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger));
+ String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
+ " t2 on t1.under_col>=t2.under_col";
+ runQueryWithTrigger(query, null, killTrigger + " violated");
+ }
+
+ @Test(timeout = 60000)
+ public void testTriggerMoveEscapeKill() throws Exception {
+ Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+ Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000");
+ Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression,
+ new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+ Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+ new Action(Action.Type.KILL_QUERY));
+ setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
+ String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
+ " t2 on t1.under_col==t2.under_col";
+ runQueryWithTrigger(query, null, null);
+ }
+
+ @Test(timeout = 60000)
+ public void testTriggerMoveConflictKill() throws Exception {
+ Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+ Expression killExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+ Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression,
+ new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+ Trigger killTrigger = new ExecutionTrigger("kill_big_read", killExpression,
+ new Action(Action.Type.KILL_QUERY));
+ setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
+ String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
+ " t2 on t1.under_col>=t2.under_col";
+ runQueryWithTrigger(query, null, killTrigger + " violated");
+ }
+
+ @Override
+ protected void setupTriggers(final List<Trigger> triggers) throws Exception {
+ setupTriggers(triggers, new ArrayList<>());
+ }
+
+ private void setupTriggers(final List<Trigger> biTriggers, final List<Trigger> etlTriggers) throws Exception {
+ WorkloadManager wm = WorkloadManager.getInstance();
+ WMPool biPool = pool("BI", 1, 0.8f);
+ WMPool etlPool = pool("ETL", 1, 0.2f);
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(biPool, etlPool));
+ plan.getPlan().setDefaultPoolPath("BI");
+
+ for (Trigger trigger : biTriggers) {
+ plan.addToTriggers(wmTriggerFromTrigger(trigger));
+ plan.addToPoolTriggers(new WMPoolTrigger("BI", trigger.getName()));
+ }
+
+ for (Trigger trigger : etlTriggers) {
+ plan.addToTriggers(wmTriggerFromTrigger(trigger));
+ plan.addToPoolTriggers(new WMPoolTrigger("ETL", trigger.getName()));
+ }
+ wm.updateResourcePlanAsync(plan).get(10, TimeUnit.SECONDS);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
index fb3af932..bcce3dc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.Expression;
import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
@@ -36,7 +37,7 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
@Test(timeout = 60000)
public void testTriggerSlowQueryExecutionTime() throws Exception {
Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
- Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -46,7 +47,7 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
@Test(timeout = 60000)
public void testTriggerTotalTasks() throws Exception {
Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50");
- Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
index 7b87a65..b377275 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.Expression;
import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
@@ -37,7 +38,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerSlowQueryElapsedTime() throws Exception {
Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 20000");
- Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -47,7 +48,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerSlowQueryExecutionTime() throws Exception {
Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
- Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -57,7 +58,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerHighShuffleBytes() throws Exception {
Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100");
- Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("big_shuffle", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
List<String> cmds = new ArrayList<>();
cmds.add("set hive.auto.convert.join=false");
@@ -72,7 +73,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerHighBytesRead() throws Exception {
Expression expression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
- Trigger trigger = new ExecutionTrigger("big_read", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("big_read", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -82,7 +83,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerHighBytesWrite() throws Exception {
Expression expression = ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100");
- Trigger trigger = new ExecutionTrigger("big_write", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("big_write", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -92,7 +93,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerTotalTasks() throws Exception {
Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50");
- Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -102,11 +103,11 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testTriggerCustomReadOps() throws Exception {
Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50");
- Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
- runQueryWithTrigger(query, getConfigs(), "Query was cancelled");
+ runQueryWithTrigger(query, getConfigs(), trigger + " violated");
}
@Test(timeout = 120000)
@@ -114,20 +115,20 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
List<String> cmds = getConfigs();
Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5");
- Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query = "create table testtab2 as select * from " + tableName;
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
// partitioned insert
expression = ExpressionFactory.fromString("CREATED_FILES > 10");
- trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
cmds.add("drop table src3");
cmds.add("create table src3 (key int) partitioned by (value string)");
query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName +
" where under_col < 100";
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
}
@Test(timeout = 240000)
@@ -140,9 +141,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
String query =
"insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100";
Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20");
- Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
cmds = getConfigs();
// let it create 57 partitions without any triggers
@@ -154,9 +155,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
// query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation
query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30");
- trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
// let it create 64 more partitions (total 57 + 64 = 121) without any triggers
query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
@@ -166,7 +167,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
// re-run insert into but this time no new partitions will be created, so there will be no violation
query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10");
- trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+ trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
runQueryWithTrigger(query, cmds, null);
}
@@ -184,9 +185,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
" insert overwrite table src2 partition (value) select * where under_col < 100 " +
" insert overwrite table src3 partition (value) select * where under_col >= 100 and under_col < 200";
Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70");
- Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
}
@Test(timeout = 60000)
@@ -203,15 +204,15 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
"union all " +
"select * from " + tableName + " where under_col >= 100 and under_col < 200) temps";
Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70");
- Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
- runQueryWithTrigger(query, cmds, "Query was cancelled");
+ runQueryWithTrigger(query, cmds, trigger + " violated");
}
@Test(timeout = 60000)
public void testTriggerCustomNonExistent() throws Exception {
Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50");
- Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY);
+ Trigger trigger = new ExecutionTrigger("non_existent", expression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(trigger));
String query =
"select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col";
@@ -221,9 +222,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testMultipleTriggers1() throws Exception {
Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000");
- Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY);
+ Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY));
Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
- Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY);
+ Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
@@ -233,9 +234,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
@Test(timeout = 60000)
public void testMultipleTriggers2() throws Exception {
Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
- Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY);
+ Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY));
Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 100000");
- Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY);
+ Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY));
setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index 79ba1f4..0506f67 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,44 +16,37 @@
package org.apache.hive.jdbc;
-import org.slf4j.Logger;
-
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
import org.junit.BeforeClass;
+import com.google.common.collect.Lists;
+
public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager {
- private final static Logger LOG = LoggerFactory.getLogger(TestTriggersWorkloadManager.class);
@BeforeClass
public static void beforeTest() throws Exception {
Class.forName(MiniHS2.getJdbcDriverName());
String confDir = "../../data/conf/llap/";
- if (confDir != null && !confDir.isEmpty()) {
- HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
- }
+ HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
conf = new HiveConf();
conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
@@ -73,7 +66,7 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
- Map<String, String> confOverlay = new HashMap<String, String>();
+ Map<String, String> confOverlay = new HashMap<>();
miniHS2.start(confOverlay);
miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
}
@@ -93,12 +86,4 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
}
wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
}
-
- private WMTrigger wmTriggerFromTrigger(Trigger trigger) {
- WMTrigger result = new WMTrigger("rp", trigger.getName());
- result.setTriggerExpression(trigger.getExpression().toString()); // TODO: hmm
- result.setActionExpression(trigger.getAction().toString()); // TODO: hmm
- LOG.debug("Produced " + result + " from " + trigger);
- return result;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
new file mode 100644
index 0000000..94b189b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KillMoveTriggerActionHandler implements TriggerActionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class);
+ private final WorkloadManager wm;
+
+ KillMoveTriggerActionHandler(final WorkloadManager wm) {
+ this.wm = wm;
+ }
+
+ @Override
+ public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
+ TezSessionState sessionState;
+ Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>(queriesViolated.size());
+ for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+ switch (entry.getValue().getAction().getType()) {
+ case KILL_QUERY:
+ sessionState = entry.getKey();
+ String queryId = sessionState.getTriggerContext().getQueryId();
+ try {
+ sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
+ } catch (HiveException e) {
+ LOG.warn("Unable to kill query {} for trigger violation");
+ }
+ break;
+ case MOVE_TO_POOL:
+ sessionState = entry.getKey();
+ if (sessionState instanceof WmTezSession) {
+ WmTezSession wmTezSession = (WmTezSession) sessionState;
+ String destPoolName = entry.getValue().getAction().getPoolName();
+ Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+ moveFutures.put(wmTezSession, moveFuture);
+ } else {
+ throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() +
+ ". SessionId: " + sessionState.getSessionId());
+ }
+ break;
+ default:
+ throw new RuntimeException("Unsupported action: " + entry.getValue());
+ }
+ }
+
+ for (Map.Entry<WmTezSession, Future<Boolean>> entry : moveFutures.entrySet()) {
+ WmTezSession wmTezSession = entry.getKey();
+ Future<Boolean> moveFuture = entry.getValue();
+ try {
+ // block to make sure move happened successfully
+ if (moveFuture.get()) {
+ LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getPoolName());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception while moving session {}", wmTezSession.getSessionId(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 474fae9..8c60b6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,9 +32,9 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class);
@Override
- public void applyAction(final Map<TezSessionState, Trigger.Action> queriesViolated) {
- for (Map.Entry<TezSessionState, Trigger.Action> entry : queriesViolated.entrySet()) {
- switch (entry.getValue()) {
+ public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
+ for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+ switch (entry.getValue().getAction().getType()) {
case KILL_QUERY:
TezSessionState sessionState = entry.getKey();
String queryId = sessionState.getTriggerContext().getQueryId();
@@ -42,7 +42,7 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
KillQuery killQuery = sessionState.getKillQuery();
// if kill query is null then session might have been released to pool or closed already
if (killQuery != null) {
- sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg());
+ sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
}
} catch (HiveException e) {
LOG.warn("Unable to kill query {} for trigger violation");
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
new file mode 100644
index 0000000..8f29197
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
+import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerPoolTriggerValidatorRunnable implements Runnable {
+ protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class);
+ private final Map<String, SessionTriggerProvider> sessionTriggerProviders;
+ private final TriggerActionHandler triggerActionHandler;
+ private final Map<String, TriggerValidatorRunnable> poolValidators;
+ private final long triggerValidationIntervalMs;
+
+ PerPoolTriggerValidatorRunnable(final Map<String, SessionTriggerProvider> sessionTriggerProviders,
+ final TriggerActionHandler triggerActionHandler,
+ final long triggerValidationIntervalMs) {
+ this.sessionTriggerProviders = sessionTriggerProviders;
+ this.triggerActionHandler = triggerActionHandler;
+ this.poolValidators = new HashMap<>();
+ this.triggerValidationIntervalMs = triggerValidationIntervalMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ScheduledExecutorService validatorExecutorService = Executors
+ .newScheduledThreadPool(sessionTriggerProviders.size());
+ for (Map.Entry<String, SessionTriggerProvider> entry : sessionTriggerProviders.entrySet()) {
+ String poolName = entry.getKey();
+ if (!poolValidators.containsKey(poolName)) {
+ LOG.info("Creating trigger validator for pool: {}", poolName);
+ TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler);
+ validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs,
+ triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+ poolValidators.put(poolName, poolValidator);
+ }
+ }
+ } catch (Throwable t) {
+ // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch
+ LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index be8b9a4..ad8a5c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,11 +18,6 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import javax.security.auth.login.LoginException;
-import org.apache.tez.dag.api.TezException;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -30,6 +25,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -193,10 +189,12 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
if (triggerValidatorRunnable == null) {
+ final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars
+ .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch());
triggerActionHandler = new KillTriggerActionHandler();
- triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler());
- startTriggerValidator(conf);
+ triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler);
+ startTriggerValidator(triggerValidationIntervalMs);
}
}
@@ -370,16 +368,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
@Override
- SessionTriggerProvider getSessionTriggerProvider() {
- return sessionTriggerProvider;
- }
-
- @Override
- TriggerActionHandler getTriggerActionHandler() {
- return triggerActionHandler;
- }
-
- @Override
TriggerValidatorRunnable getTriggerValidatorRunnable() {
return triggerValidatorRunnable;
}
@@ -520,8 +508,8 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
private void updateSessionsTriggers() {
if (sessionTriggerProvider != null && globalTriggersFetcher != null) {
- sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions));
- sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
+ sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions));
+ sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
}
}
@@ -552,7 +540,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
public List<String> getTriggerCounterNames() {
List<String> counterNames = new ArrayList<>();
if (sessionTriggerProvider != null) {
- List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
+ List<Trigger> activeTriggers = sessionTriggerProvider.getTriggers();
for (Trigger trigger : activeTriggers) {
counterNames.add(trigger.getExpression().getCounterLimit().getName());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6887d7a..db2848c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -73,18 +73,12 @@ class TezSessionPoolSession extends TezSessionState {
}
public static abstract class AbstractTriggerValidator {
- abstract SessionTriggerProvider getSessionTriggerProvider();
+ abstract Runnable getTriggerValidatorRunnable();
- abstract TriggerActionHandler getTriggerActionHandler();
-
- abstract TriggerValidatorRunnable getTriggerValidatorRunnable();
-
- public void startTriggerValidator(Configuration conf) {
- long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ public void startTriggerValidator(long triggerValidationIntervalMs) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
- TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable();
+ Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 33bccbe..5821659 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
@@ -41,29 +42,51 @@ public class TriggerValidatorRunnable implements Runnable {
@Override
public void run() {
try {
- Map<TezSessionState, Trigger.Action> violatedSessions = new HashMap<>();
- final List<TezSessionState> sessions = sessionTriggerProvider.getOpenSessions();
- final List<Trigger> triggers = sessionTriggerProvider.getActiveTriggers();
- for (TezSessionState s : sessions) {
- TriggerContext triggerContext = s.getTriggerContext();
- if (triggerContext != null && !triggerContext.isQueryCompleted()) {
+ Map<TezSessionState, Trigger> violatedSessions = new HashMap<>();
+ final List<TezSessionState> sessions = sessionTriggerProvider.getSessions();
+ final List<Trigger> triggers = sessionTriggerProvider.getTriggers();
+ for (TezSessionState sessionState : sessions) {
+ TriggerContext triggerContext = sessionState.getTriggerContext();
+ if (triggerContext != null && !triggerContext.isQueryCompleted()
+ && !triggerContext.getCurrentCounters().isEmpty()) {
Map<String, Long> currentCounters = triggerContext.getCurrentCounters();
- for (Trigger t : triggers) {
- String desiredCounter = t.getExpression().getCounterLimit().getName();
+ for (Trigger currentTrigger : triggers) {
+ String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName();
// there could be interval where desired counter value is not populated by the time we make this check
if (currentCounters.containsKey(desiredCounter)) {
long currentCounterValue = currentCounters.get(desiredCounter);
- if (t.apply(currentCounterValue)) {
- String queryId = s.getTriggerContext().getQueryId();
- LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId,
- t, currentCounterValue, t.getAction());
- Trigger.Action action = t.getAction();
- String msg = "Trigger " + t + " violated. Current value: " + currentCounterValue;
- action.setMsg(msg);
- violatedSessions.put(s, action);
+ if (currentTrigger.apply(currentCounterValue)) {
+ String queryId = sessionState.getTriggerContext().getQueryId();
+ if (violatedSessions.containsKey(sessionState)) {
+ // session already has a violation
+ Trigger existingTrigger = violatedSessions.get(sessionState);
+ // KILL always takes priority over MOVE
+ if (existingTrigger.getAction().getType().equals(Action.Type.MOVE_TO_POOL) &&
+ currentTrigger.getAction().getType().equals(Action.Type.KILL_QUERY)) {
+ currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " +
+ currentCounterValue);
+ violatedSessions.put(sessionState, currentTrigger);
+ LOG.info("KILL trigger replacing MOVE for query {}", queryId);
+ } else {
+ // if multiple MOVE happens, only first move will be chosen
+ LOG.warn("Conflicting MOVE triggers ({} and {}). Choosing the first MOVE trigger: {}",
+ existingTrigger, currentTrigger, existingTrigger.getName());
+ }
+ } else {
+ // first violation for the session
+ currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " +
+ currentCounterValue);
+ violatedSessions.put(sessionState, currentTrigger);
+ }
}
}
}
+
+ Trigger chosenTrigger = violatedSessions.get(sessionState);
+ if (chosenTrigger != null) {
+ LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(),
+ chosenTrigger.getViolationMsg());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
deleted file mode 100644
index c46569b..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import java.util.Map;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.wm.Trigger;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TriggerViolationActionHandler implements TriggerActionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class);
-
- @Override
- public void applyAction(final Map<TezSessionState, Trigger.Action> queriesViolated) {
- for (Map.Entry<TezSessionState, Trigger.Action> entry : queriesViolated.entrySet()) {
- switch (entry.getValue()) {
- case KILL_QUERY:
- TezSessionState sessionState = entry.getKey();
- String queryId = sessionState.getTriggerContext().getQueryId();
- try {
- sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg());
- } catch (HiveException e) {
- LOG.warn("Unable to kill query {} for trigger violation");
- }
- break;
- default:
- throw new RuntimeException("Unsupported action: " + entry.getValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 0dd1433..6cf2aad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
@@ -188,6 +186,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
this.killReason = killReason;
}
+ @Override
+ public String toString() {
+ return super.toString() + ", poolName: " + poolName + ", clusterFraction: " + clusterFraction;
+ }
+
private final class TimeoutRunnable implements Runnable {
@Override
public void run() {