You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/13 21:50:28 UTC

[2/2] hive git commit: HIVE-17809: Implement per pool trigger validation and move sessions across pools (Prasanth Jayachandran reviewed by Sergey Shelukhin)

HIVE-17809: Implement per pool trigger validation and move sessions across pools (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/51249505
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/51249505
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/51249505

Branch: refs/heads/master
Commit: 51249505c7b3db6bfa586cb1b6dee268f2ed6ce6
Parents: 7150f9c
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 13 13:48:19 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 13 13:48:19 2017 -0800

----------------------------------------------------------------------
 data/conf/hive-log4j2.properties                |  11 +-
 .../hive/jdbc/AbstractJdbcTriggersTest.java     |  16 +-
 .../jdbc/TestTriggersMoveWorkloadManager.java   | 154 ++++++++
 .../hive/jdbc/TestTriggersNoTezSessionPool.java |   7 +-
 .../jdbc/TestTriggersTezSessionPoolManager.java |  53 +--
 .../hive/jdbc/TestTriggersWorkloadManager.java  |  29 +-
 .../exec/tez/KillMoveTriggerActionHandler.java  |  83 +++++
 .../ql/exec/tez/KillTriggerActionHandler.java   |  10 +-
 .../tez/PerPoolTriggerValidatorRunnable.java    |  66 ++++
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  28 +-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |  12 +-
 .../ql/exec/tez/TriggerValidatorRunnable.java   |  57 ++-
 .../exec/tez/TriggerViolationActionHandler.java |  48 ---
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |   7 +-
 .../hive/ql/exec/tez/WorkloadManager.java       | 348 ++++++++++++-------
 .../ql/exec/tez/WorkloadManagerFederation.java  |   2 +-
 .../org/apache/hadoop/hive/ql/wm/Action.java    | 107 ++++++
 .../hadoop/hive/ql/wm/CustomCounterLimit.java   |   4 +-
 .../hadoop/hive/ql/wm/ExecutionTrigger.java     |  22 +-
 .../apache/hadoop/hive/ql/wm/Expression.java    |   4 +-
 .../hadoop/hive/ql/wm/ExpressionFactory.java    |   2 +-
 .../hive/ql/wm/FileSystemCounterLimit.java      |   6 +-
 .../ql/wm/MetastoreGlobalTriggersFetcher.java   |  20 +-
 .../MetastoreResourcePlanTriggersFetcher.java   |  38 --
 .../hive/ql/wm/SessionTriggerProvider.java      |  33 +-
 .../hadoop/hive/ql/wm/TimeCounterLimit.java     |   4 +-
 .../org/apache/hadoop/hive/ql/wm/Trigger.java   |  46 +--
 .../hadoop/hive/ql/wm/TriggerActionHandler.java |   4 +-
 .../hadoop/hive/ql/wm/TriggerContext.java       |  12 +-
 .../hadoop/hive/ql/wm/TriggerExpression.java    |   2 +-
 .../hadoop/hive/ql/wm/TriggersFetcher.java      |  25 --
 .../hadoop/hive/ql/wm/VertexCounterLimit.java   |   4 +-
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 239 ++++++++++++-
 .../apache/hadoop/hive/ql/wm/TestTrigger.java   |  36 +-
 34 files changed, 1085 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/data/conf/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/data/conf/hive-log4j2.properties b/data/conf/hive-log4j2.properties
index 73c7d90..e5bb166 100644
--- a/data/conf/hive-log4j2.properties
+++ b/data/conf/hive-log4j2.properties
@@ -50,7 +50,7 @@ appender.DRFA.strategy.type = DefaultRolloverStrategy
 appender.DRFA.strategy.max = 30
 
 # list of all loggers
-loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp
+loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp, Thrift, Jetty, BlockStateChange
 
 logger.HadoopIPC.name = org.apache.hadoop.ipc
 logger.HadoopIPC.level = WARN
@@ -127,6 +127,15 @@ logger.AmazonAws.level = INFO
 logger.ApacheHttp.name=org.apache.http
 logger.ApacheHttp.level = INFO
 
+logger.Thrift.name = org.apache.thrift
+logger.Thrift.level = WARN
+
+logger.Jetty.name = org.eclipse.jetty
+logger.Jetty.level = WARN
+
+logger.BlockStateChange.name = BlockStateChange
+logger.BlockStateChange.level = WARN
+
 # root logger
 rootLogger.level = ${sys:hive.log.level}
 rootLogger.appenderRefs = root, console

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 68d57ca..235e6c3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -102,7 +103,7 @@ public abstract class AbstractJdbcTriggersTest {
     }
   }
 
-  private void createSleepUDF() throws SQLException {
+  void createSleepUDF() throws SQLException {
     String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
     Connection con = hs2Conn;
     Statement stmt = con.createStatement();
@@ -110,7 +111,7 @@ public abstract class AbstractJdbcTriggersTest {
     stmt.close();
   }
 
-  protected void runQueryWithTrigger(final String query, final List<String> setCmds,
+  void runQueryWithTrigger(final String query, final List<String> setCmds,
     final String expect)
     throws Exception {
 
@@ -149,7 +150,7 @@ public abstract class AbstractJdbcTriggersTest {
 
   abstract void setupTriggers(final List<Trigger> triggers) throws Exception;
 
-  protected List<String> getConfigs(String... more) {
+  List<String> getConfigs(String... more) {
     List<String> setCmds = new ArrayList<>();
     setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
     setCmds.add("set mapred.min.split.size=100");
@@ -161,4 +162,11 @@ public abstract class AbstractJdbcTriggersTest {
     }
     return setCmds;
   }
+
+  WMTrigger wmTriggerFromTrigger(Trigger trigger) {
+    WMTrigger result = new WMTrigger("rp", trigger.getName());
+    result.setTriggerExpression(trigger.getExpression().toString());
+    result.setActionExpression(trigger.getAction().toString());
+    return result;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
new file mode 100644
index 0000000..a983855
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan;
+import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.wm.Action;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.Expression;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+
+    String confDir = "../../data/conf/llap/";
+    HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+    System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+
+    conf = new HiveConf();
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS);
+    conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default");
+    conf.setBoolean("hive.test.workload.management", true);
+    conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
+    conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+    // don't want cache hits from llap io for testing filesystem bytes read counters
+    conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
+
+    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+      + "/tez-site.xml"));
+
+    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+    Map<String, String> confOverlay = new HashMap<>();
+    miniHS2.start(confOverlay);
+    miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+  }
+
+  @Test(timeout = 60000)
+  public void testTriggerMoveAndKill() throws Exception {
+    Expression moveExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
+    Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000");
+    Trigger moveTrigger = new ExecutionTrigger("slow_query_move", moveExpression,
+      new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+    Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+      new Action(Action.Type.KILL_QUERY));
+    setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger));
+    String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
+      " t2 on t1.under_col>=t2.under_col";
+    runQueryWithTrigger(query, null, killTrigger + " violated");
+  }
+
+  @Test(timeout = 60000)
+  public void testTriggerMoveEscapeKill() throws Exception {
+    Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+    Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000");
+    Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression,
+      new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+    Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+      new Action(Action.Type.KILL_QUERY));
+    setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
+    String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
+      " t2 on t1.under_col==t2.under_col";
+    runQueryWithTrigger(query, null, null);
+  }
+
+  @Test(timeout = 60000)
+  public void testTriggerMoveConflictKill() throws Exception {
+    Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+    Expression killExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+    Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression,
+      new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+    Trigger killTrigger = new ExecutionTrigger("kill_big_read", killExpression,
+      new Action(Action.Type.KILL_QUERY));
+    setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
+    String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
+      " t2 on t1.under_col>=t2.under_col";
+    runQueryWithTrigger(query, null, killTrigger + " violated");
+  }
+
+  @Override
+  protected void setupTriggers(final List<Trigger> triggers) throws Exception {
+    setupTriggers(triggers, new ArrayList<>());
+  }
+
+  private void setupTriggers(final List<Trigger> biTriggers, final List<Trigger> etlTriggers) throws Exception {
+    WorkloadManager wm = WorkloadManager.getInstance();
+    WMPool biPool = pool("BI", 1, 0.8f);
+    WMPool etlPool = pool("ETL", 1, 0.2f);
+    WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(biPool, etlPool));
+    plan.getPlan().setDefaultPoolPath("BI");
+
+    for (Trigger trigger : biTriggers) {
+      plan.addToTriggers(wmTriggerFromTrigger(trigger));
+      plan.addToPoolTriggers(new WMPoolTrigger("BI", trigger.getName()));
+    }
+
+    for (Trigger trigger : etlTriggers) {
+      plan.addToTriggers(wmTriggerFromTrigger(trigger));
+      plan.addToPoolTriggers(new WMPoolTrigger("ETL", trigger.getName()));
+    }
+    wm.updateResourcePlanAsync(plan).get(10, TimeUnit.SECONDS);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
index fb3af932..bcce3dc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.Expression;
 import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
@@ -36,7 +37,7 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
   @Test(timeout = 60000)
   public void testTriggerSlowQueryExecutionTime() throws Exception {
     Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
-    Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -46,7 +47,7 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest {
   @Test(timeout = 60000)
   public void testTriggerTotalTasks() throws Exception {
     Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50");
-    Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
index 7b87a65..b377275 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.Expression;
 import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
@@ -37,7 +38,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerSlowQueryElapsedTime() throws Exception {
     Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 20000");
-    Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -47,7 +48,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerSlowQueryExecutionTime() throws Exception {
     Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
-    Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -57,7 +58,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerHighShuffleBytes() throws Exception {
     Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100");
-    Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("big_shuffle", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     List<String> cmds = new ArrayList<>();
     cmds.add("set hive.auto.convert.join=false");
@@ -72,7 +73,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerHighBytesRead() throws Exception {
     Expression expression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
-    Trigger trigger = new ExecutionTrigger("big_read", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("big_read", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -82,7 +83,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerHighBytesWrite() throws Exception {
     Expression expression = ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100");
-    Trigger trigger = new ExecutionTrigger("big_write", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("big_write", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -92,7 +93,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerTotalTasks() throws Exception {
     Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50");
-    Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -102,11 +103,11 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testTriggerCustomReadOps() throws Exception {
     Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50");
-    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
-    runQueryWithTrigger(query, getConfigs(), "Query was cancelled");
+    runQueryWithTrigger(query, getConfigs(), trigger + " violated");
   }
 
   @Test(timeout = 120000)
@@ -114,20 +115,20 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
     List<String> cmds = getConfigs();
 
     Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5");
-    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query = "create table testtab2 as select * from " + tableName;
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
 
     // partitioned insert
     expression = ExpressionFactory.fromString("CREATED_FILES > 10");
-    trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     cmds.add("drop table src3");
     cmds.add("create table src3 (key int) partitioned by (value string)");
     query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName +
       " where under_col < 100";
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
   }
 
   @Test(timeout = 240000)
@@ -140,9 +141,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
     String query =
       "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100";
     Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20");
-    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
 
     cmds = getConfigs();
     // let it create 57 partitions without any triggers
@@ -154,9 +155,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
     // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation
     query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
     expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30");
-    trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
 
     // let it create 64 more partitions (total 57 + 64 = 121) without any triggers
     query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
@@ -166,7 +167,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
     // re-run insert into but this time no new partitions will be created, so there will be no violation
     query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200";
     expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10");
-    trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY);
+    trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     runQueryWithTrigger(query, cmds, null);
   }
@@ -184,9 +185,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
         " insert overwrite table src2 partition (value) select * where under_col < 100 " +
         " insert overwrite table src3 partition (value) select * where under_col >= 100 and under_col < 200";
     Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70");
-    Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
   }
 
   @Test(timeout = 60000)
@@ -203,15 +204,15 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
         "union all " +
         "select * from " + tableName + " where under_col >= 100 and under_col < 200) temps";
     Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70");
-    Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
-    runQueryWithTrigger(query, cmds, "Query was cancelled");
+    runQueryWithTrigger(query, cmds, trigger + " violated");
   }
 
   @Test(timeout = 60000)
   public void testTriggerCustomNonExistent() throws Exception {
     Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50");
-    Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY);
+    Trigger trigger = new ExecutionTrigger("non_existent", expression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(trigger));
     String query =
       "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col";
@@ -221,9 +222,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testMultipleTriggers1() throws Exception {
     Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000");
-    Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY);
+    Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY));
     Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000");
-    Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY);
+    Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
@@ -233,9 +234,9 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest
   @Test(timeout = 60000)
   public void testMultipleTriggers2() throws Exception {
     Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
-    Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY);
+    Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY));
     Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 100000");
-    Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY);
+    Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY));
     setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index 79ba1f4..0506f67 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,44 +16,37 @@
 
 package org.apache.hive.jdbc;
 
-import org.slf4j.Logger;
-
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
 import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
 import org.junit.BeforeClass;
 
+import com.google.common.collect.Lists;
+
 public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager {
-  private final static Logger LOG = LoggerFactory.getLogger(TestTriggersWorkloadManager.class);
 
   @BeforeClass
   public static void beforeTest() throws Exception {
     Class.forName(MiniHS2.getJdbcDriverName());
 
     String confDir = "../../data/conf/llap/";
-    if (confDir != null && !confDir.isEmpty()) {
-      HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
-      System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
-    }
+    HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+    System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
 
     conf = new HiveConf();
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
@@ -73,7 +66,7 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
 
-    Map<String, String> confOverlay = new HashMap<String, String>();
+    Map<String, String> confOverlay = new HashMap<>();
     miniHS2.start(confOverlay);
     miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
   }
@@ -93,12 +86,4 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
     }
     wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
   }
-
-  private WMTrigger wmTriggerFromTrigger(Trigger trigger) {
-    WMTrigger result = new WMTrigger("rp", trigger.getName());
-    result.setTriggerExpression(trigger.getExpression().toString()); // TODO: hmm
-    result.setActionExpression(trigger.getAction().toString()); // TODO: hmm
-    LOG.debug("Produced " + result + " from " + trigger);
-    return result;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
new file mode 100644
index 0000000..94b189b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KillMoveTriggerActionHandler implements TriggerActionHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class);
+  private final WorkloadManager wm;
+
+  KillMoveTriggerActionHandler(final WorkloadManager wm) {
+    this.wm = wm;
+  }
+
+  @Override
+  public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
+    TezSessionState sessionState;
+    Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>(queriesViolated.size());
+    for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+      switch (entry.getValue().getAction().getType()) {
+        case KILL_QUERY:
+          sessionState = entry.getKey();
+          String queryId = sessionState.getTriggerContext().getQueryId();
+          try {
+            sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
+          } catch (HiveException e) {
+            LOG.warn("Unable to kill query {} for trigger violation");
+          }
+          break;
+        case MOVE_TO_POOL:
+          sessionState = entry.getKey();
+          if (sessionState instanceof WmTezSession) {
+            WmTezSession wmTezSession = (WmTezSession) sessionState;
+            String destPoolName = entry.getValue().getAction().getPoolName();
+            Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+            moveFutures.put(wmTezSession, moveFuture);
+          } else {
+            throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() +
+              ". SessionId: " + sessionState.getSessionId());
+          }
+          break;
+        default:
+          throw new RuntimeException("Unsupported action: " + entry.getValue());
+      }
+    }
+
+    for (Map.Entry<WmTezSession, Future<Boolean>> entry : moveFutures.entrySet()) {
+      WmTezSession wmTezSession = entry.getKey();
+      Future<Boolean> moveFuture = entry.getValue();
+      try {
+        // block to make sure move happened successfully
+        if (moveFuture.get()) {
+          LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getPoolName());
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Exception while moving session {}", wmTezSession.getSessionId(), e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 474fae9..8c60b6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,9 +32,9 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
   private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class);
 
   @Override
-  public void applyAction(final Map<TezSessionState, Trigger.Action> queriesViolated) {
-    for (Map.Entry<TezSessionState, Trigger.Action> entry : queriesViolated.entrySet()) {
-      switch (entry.getValue()) {
+  public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
+    for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+      switch (entry.getValue().getAction().getType()) {
         case KILL_QUERY:
           TezSessionState sessionState = entry.getKey();
           String queryId = sessionState.getTriggerContext().getQueryId();
@@ -42,7 +42,7 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
             KillQuery killQuery = sessionState.getKillQuery();
             // if kill query is null then session might have been released to pool or closed already
             if (killQuery != null) {
-              sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg());
+              sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
             }
           } catch (HiveException e) {
             LOG.warn("Unable to kill query {} for trigger violation");

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
new file mode 100644
index 0000000..8f29197
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
+import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerPoolTriggerValidatorRunnable implements Runnable {
+  protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class);
+  private final Map<String, SessionTriggerProvider> sessionTriggerProviders;
+  private final TriggerActionHandler triggerActionHandler;
+  private final Map<String, TriggerValidatorRunnable> poolValidators;
+  private final long triggerValidationIntervalMs;
+
+  PerPoolTriggerValidatorRunnable(final Map<String, SessionTriggerProvider> sessionTriggerProviders,
+    final TriggerActionHandler triggerActionHandler,
+    final long triggerValidationIntervalMs) {
+    this.sessionTriggerProviders = sessionTriggerProviders;
+    this.triggerActionHandler = triggerActionHandler;
+    this.poolValidators = new HashMap<>();
+    this.triggerValidationIntervalMs = triggerValidationIntervalMs;
+  }
+
+  @Override
+  public void run() {
+    try {
+      ScheduledExecutorService validatorExecutorService = Executors
+        .newScheduledThreadPool(sessionTriggerProviders.size());
+      for (Map.Entry<String, SessionTriggerProvider> entry : sessionTriggerProviders.entrySet()) {
+        String poolName = entry.getKey();
+        if (!poolValidators.containsKey(poolName)) {
+          LOG.info("Creating trigger validator for pool: {}", poolName);
+          TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler);
+          validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs,
+            triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
+          poolValidators.put(poolName, poolValidator);
+        }
+      }
+    } catch (Throwable t) {
+      // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch
+      LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index be8b9a4..ad8a5c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import javax.security.auth.login.LoginException;
-import org.apache.tez.dag.api.TezException;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -193,10 +189,12 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     }
 
     if (triggerValidatorRunnable == null) {
+      final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars
+        .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
       sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch());
       triggerActionHandler = new KillTriggerActionHandler();
-      triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler());
-      startTriggerValidator(conf);
+      triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler);
+      startTriggerValidator(triggerValidationIntervalMs);
     }
   }
 
@@ -370,16 +368,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   }
 
   @Override
-  SessionTriggerProvider getSessionTriggerProvider() {
-    return sessionTriggerProvider;
-  }
-
-  @Override
-  TriggerActionHandler getTriggerActionHandler() {
-    return triggerActionHandler;
-  }
-
-  @Override
   TriggerValidatorRunnable getTriggerValidatorRunnable() {
     return triggerValidatorRunnable;
   }
@@ -520,8 +508,8 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   private void updateSessionsTriggers() {
     if (sessionTriggerProvider != null && globalTriggersFetcher != null) {
-      sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions));
-      sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
+      sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions));
+      sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch()));
     }
   }
 
@@ -552,7 +540,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   public List<String> getTriggerCounterNames() {
     List<String> counterNames = new ArrayList<>();
     if (sessionTriggerProvider != null) {
-      List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
+      List<Trigger> activeTriggers = sessionTriggerProvider.getTriggers();
       for (Trigger trigger : activeTriggers) {
         counterNames.add(trigger.getExpression().getCounterLimit().getName());
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6887d7a..db2848c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -73,18 +73,12 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   public static abstract class AbstractTriggerValidator {
-    abstract SessionTriggerProvider getSessionTriggerProvider();
+    abstract Runnable getTriggerValidatorRunnable();
 
-    abstract TriggerActionHandler getTriggerActionHandler();
-
-    abstract TriggerValidatorRunnable getTriggerValidatorRunnable();
-
-    public void startTriggerValidator(Configuration conf) {
-      long triggerValidationIntervalMs = HiveConf.getTimeVar(conf,
-        HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    public void startTriggerValidator(long triggerValidationIntervalMs) {
       final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
-      TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable();
+      Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
       scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
         triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 33bccbe..5821659 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
@@ -41,29 +42,51 @@ public class TriggerValidatorRunnable implements Runnable {
   @Override
   public void run() {
     try {
-      Map<TezSessionState, Trigger.Action> violatedSessions = new HashMap<>();
-      final List<TezSessionState> sessions = sessionTriggerProvider.getOpenSessions();
-      final List<Trigger> triggers = sessionTriggerProvider.getActiveTriggers();
-      for (TezSessionState s : sessions) {
-        TriggerContext triggerContext = s.getTriggerContext();
-        if (triggerContext != null && !triggerContext.isQueryCompleted()) {
+      Map<TezSessionState, Trigger> violatedSessions = new HashMap<>();
+      final List<TezSessionState> sessions = sessionTriggerProvider.getSessions();
+      final List<Trigger> triggers = sessionTriggerProvider.getTriggers();
+      for (TezSessionState sessionState : sessions) {
+        TriggerContext triggerContext = sessionState.getTriggerContext();
+        if (triggerContext != null && !triggerContext.isQueryCompleted()
+          && !triggerContext.getCurrentCounters().isEmpty()) {
           Map<String, Long> currentCounters = triggerContext.getCurrentCounters();
-          for (Trigger t : triggers) {
-            String desiredCounter = t.getExpression().getCounterLimit().getName();
+          for (Trigger currentTrigger : triggers) {
+            String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName();
             // there could be interval where desired counter value is not populated by the time we make this check
             if (currentCounters.containsKey(desiredCounter)) {
               long currentCounterValue = currentCounters.get(desiredCounter);
-              if (t.apply(currentCounterValue)) {
-                String queryId = s.getTriggerContext().getQueryId();
-                LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId,
-                  t, currentCounterValue, t.getAction());
-                Trigger.Action action = t.getAction();
-                String msg = "Trigger " + t + " violated. Current value: " + currentCounterValue;
-                action.setMsg(msg);
-                violatedSessions.put(s, action);
+              if (currentTrigger.apply(currentCounterValue)) {
+                String queryId = sessionState.getTriggerContext().getQueryId();
+                if (violatedSessions.containsKey(sessionState)) {
+                  // session already has a violation
+                  Trigger existingTrigger = violatedSessions.get(sessionState);
+                  // KILL always takes priority over MOVE
+                  if (existingTrigger.getAction().getType().equals(Action.Type.MOVE_TO_POOL) &&
+                    currentTrigger.getAction().getType().equals(Action.Type.KILL_QUERY)) {
+                    currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " +
+                      currentCounterValue);
+                    violatedSessions.put(sessionState, currentTrigger);
+                    LOG.info("KILL trigger replacing MOVE for query {}", queryId);
+                  } else {
+                    // if multiple MOVE happens, only first move will be chosen
+                    LOG.warn("Conflicting MOVE triggers ({} and {}). Choosing the first MOVE trigger: {}",
+                      existingTrigger, currentTrigger, existingTrigger.getName());
+                  }
+                } else {
+                  // first violation for the session
+                  currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " +
+                    currentCounterValue);
+                  violatedSessions.put(sessionState, currentTrigger);
+                }
               }
             }
           }
+
+          Trigger chosenTrigger = violatedSessions.get(sessionState);
+          if (chosenTrigger != null) {
+            LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(),
+              chosenTrigger.getViolationMsg());
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
deleted file mode 100644
index c46569b..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import java.util.Map;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.wm.Trigger;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TriggerViolationActionHandler implements TriggerActionHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class);
-
-  @Override
-  public void applyAction(final Map<TezSessionState, Trigger.Action> queriesViolated) {
-    for (Map.Entry<TezSessionState, Trigger.Action> entry : queriesViolated.entrySet()) {
-      switch (entry.getValue()) {
-        case KILL_QUERY:
-          TezSessionState sessionState = entry.getKey();
-          String queryId = sessionState.getTriggerContext().getQueryId();
-          try {
-            sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg());
-          } catch (HiveException e) {
-            LOG.warn("Unable to kill query {} for trigger violation");
-          }
-          break;
-        default:
-          throw new RuntimeException("Unsupported action: " + entry.getValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/51249505/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 0dd1433..6cf2aad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.hadoop.hive.ql.exec.Utilities;
-
 import com.google.common.annotations.VisibleForTesting;
 
 import com.google.common.util.concurrent.SettableFuture;
@@ -188,6 +186,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     this.killReason = killReason;
   }
 
+  @Override
+  public String toString() {
+    return super.toString() + ", poolName: " + poolName + ", clusterFraction: " + clusterFraction;
+  }
+
   private final class TimeoutRunnable implements Runnable {
     @Override
     public void run() {