You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/12/06 08:31:00 UTC

[2/2] hive git commit: HIVE-18088: Add WM event traces at query level for debugging (Prasanth Jayachandran reviewed by Sergey Shelukhin)

HIVE-18088: Add WM event traces at query level for debugging (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3060b30
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3060b30
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3060b30

Branch: refs/heads/master
Commit: a3060b30705d1bd55c6be5357f7575534c84e6b0
Parents: 0d83233
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Dec 6 00:30:42 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Dec 6 00:30:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 itests/hive-unit/pom.xml                        |  18 +-
 .../hive/jdbc/AbstractJdbcTriggersTest.java     |  73 ++++--
 .../jdbc/TestTriggersMoveWorkloadManager.java   | 109 ++++++++-
 .../java/org/apache/hadoop/hive/ql/Context.java |  12 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   6 +-
 .../hadoop/hive/ql/exec/tez/AmPluginNode.java   |  13 +-
 .../exec/tez/KillMoveTriggerActionHandler.java  |  51 ++--
 .../ql/exec/tez/KillTriggerActionHandler.java   |   4 +-
 .../hive/ql/exec/tez/TezSessionState.java       |  22 +-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  33 +--
 .../ql/exec/tez/TriggerValidatorRunnable.java   |  14 +-
 .../apache/hadoop/hive/ql/exec/tez/WmEvent.java | 127 ++++++++++
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  40 +++-
 .../hive/ql/exec/tez/WorkloadManager.java       | 234 +++++++++++++------
 .../ql/exec/tez/WorkloadManagerFederation.java  |  30 ++-
 .../ql/exec/tez/monitoring/PrintSummary.java    |   2 +-
 .../ql/exec/tez/monitoring/TezJobMonitor.java   |  27 +--
 .../hooks/PostExecWMEventsSummaryPrinter.java   |  62 +++++
 .../org/apache/hadoop/hive/ql/wm/Trigger.java   |   3 +
 .../hadoop/hive/ql/wm/TriggerActionHandler.java |   4 +-
 .../hadoop/hive/ql/wm/TriggerContext.java       |  74 ------
 .../org/apache/hadoop/hive/ql/wm/WmContext.java | 233 ++++++++++++++++++
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  96 ++++----
 24 files changed, 964 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3be5a8d..75b7707 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3052,6 +3052,10 @@ public class HiveConf extends Configuration {
         "hive.tez.exec.print.summary",
         false,
         "Display breakdown of execution steps, for every query executed by the shell."),
+    TEZ_SESSION_EVENTS_SUMMARY(
+      "hive.tez.session.events.print.summary",
+      "none", new StringSet("none", "text", "json"),
+      "Display summary of all tez sessions related events in text or json format"),
     TEZ_EXEC_INPLACE_PROGRESS(
         "hive.tez.exec.inplace.progress",
         true,

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index ea5b7b9..3a435a8 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -361,7 +361,23 @@
       <version>${tez.version}</version>
       <scope>test</scope>
         <exclusions>
-             <exclusion>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+          <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
           </exclusion>

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 235e6c3..62ee66f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.PrintStream;
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -103,7 +105,7 @@ public abstract class AbstractJdbcTriggersTest {
     }
   }
 
-  void createSleepUDF() throws SQLException {
+  private void createSleepUDF() throws SQLException {
     String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
     Connection con = hs2Conn;
     Statement stmt = con.createStatement();
@@ -112,40 +114,65 @@ public abstract class AbstractJdbcTriggersTest {
   }
 
   void runQueryWithTrigger(final String query, final List<String> setCmds,
-    final String expect)
+    final String expect) throws Exception {
+    runQueryWithTrigger(query, setCmds, expect, null);
+  }
+
+  void runQueryWithTrigger(final String query, final List<String> setCmds,
+    final String expect, final List<String> errCaptureExpect)
     throws Exception {
 
     Connection con = hs2Conn;
     TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
     createSleepUDF();
 
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setErr(new PrintStream(baos)); // capture stderr
     final Statement selStmt = con.createStatement();
     final Throwable[] throwable = new Throwable[1];
-    Thread queryThread = new Thread(() -> {
-      try {
-        if (setCmds != null) {
-          for (String setCmd : setCmds) {
-            selStmt.execute(setCmd);
+    try {
+      Thread queryThread = new Thread(() -> {
+        try {
+          if (setCmds != null) {
+            for (String setCmd : setCmds) {
+              selStmt.execute(setCmd);
+            }
           }
+          selStmt.execute(query);
+        } catch (SQLException e) {
+          throwable[0] = e;
+        }
+      });
+      queryThread.start();
+
+      queryThread.join();
+      selStmt.close();
+
+      if (expect == null) {
+        assertNull("Expected query to succeed", throwable[0]);
+      } else {
+        assertNotNull("Expected non-null throwable", throwable[0]);
+        assertEquals(SQLException.class, throwable[0].getClass());
+        assertTrue(expect + " is not contained in " + throwable[0].getMessage(),
+          throwable[0].getMessage().contains(expect));
+      }
+
+      if (errCaptureExpect != null && !errCaptureExpect.isEmpty()) {
+        // failure hooks are run after HiveStatement is closed. wait sometime for failure hook to execute
+        String stdErrStr = "";
+        while (!stdErrStr.contains(errCaptureExpect.get(0))) {
+          baos.flush();
+          stdErrStr = baos.toString();
+          Thread.sleep(500);
+        }
+        for (String errExpect : errCaptureExpect) {
+          assertTrue("'" + errExpect + "' expected in STDERR capture, but not found.", stdErrStr.contains(errExpect));
         }
-        selStmt.execute(query);
-      } catch (SQLException e) {
-        throwable[0] = e;
       }
-    });
-    queryThread.start();
-
-    queryThread.join();
-    selStmt.close();
-
-    if (expect == null) {
-      assertNull("Expected query to succeed", throwable[0]);
-    } else {
-      assertNotNull("Expected non-null throwable", throwable[0]);
-      assertEquals(SQLException.class, throwable[0].getClass());
-      assertTrue(expect + " is not contained in " + throwable[0].getMessage(),
-        throwable[0].getMessage().contains(expect));
+    } finally {
+      baos.close();
     }
+
   }
 
   abstract void setupTriggers(final List<Trigger> triggers) throws Exception;

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
index a983855..74ca958 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
@@ -18,16 +18,9 @@ package org.apache.hive.jdbc;
 
 import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan;
 import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.net.URL;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -94,10 +87,30 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
       new Action(Action.Type.MOVE_TO_POOL, "ETL"));
     Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
       new Action(Action.Type.KILL_QUERY));
-    setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger));
+    setupTriggers(Lists.newArrayList(moveTrigger), Lists.newArrayList(killTrigger));
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
-    runQueryWithTrigger(query, null, killTrigger + " violated");
+    List<String> setCmds = new ArrayList<>();
+    setCmds.add("set hive.tez.session.events.print.summary=json");
+    setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    List<String> errCaptureExpect = new ArrayList<>();
+    errCaptureExpect.add("Workload Manager Events Summary");
+    errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+    errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+    errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("\"eventType\" : \"GET\"");
+    errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+    errCaptureExpect.add("\"eventType\" : \"KILL\"");
+    errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+    errCaptureExpect.add("\"name\" : \"slow_query_move\"");
+    errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+    // violation in BI queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger + " violated");
+    // violation in ETL queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+    runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
   }
 
   @Test(timeout = 60000)
@@ -111,7 +124,65 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
     setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
     String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col==t2.under_col";
-    runQueryWithTrigger(query, null, null);
+    List<String> setCmds = new ArrayList<>();
+    setCmds.add("set hive.tez.session.events.print.summary=json");
+    setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    List<String> errCaptureExpect = new ArrayList<>();
+    errCaptureExpect.add("Workload Manager Events Summary");
+    errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+    errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+    errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("\"eventType\" : \"GET\"");
+    errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+    errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+    errCaptureExpect.add("\"name\" : \"move_big_read\"");
+    errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+    // violation in BI queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger + " violated");
+    runQueryWithTrigger(query, setCmds, null, errCaptureExpect);
+  }
+
+  @Test(timeout = 60000)
+  public void testTriggerMoveBackKill() throws Exception {
+    Expression moveExpression1 = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+    Expression moveExpression2 = ExpressionFactory.fromString("SHUFFLE_BYTES > 200");
+    Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 2000");
+    Trigger moveTrigger1 = new ExecutionTrigger("move_big_read", moveExpression1,
+      new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+    Trigger moveTrigger2 = new ExecutionTrigger("move_high", moveExpression2,
+      new Action(Action.Type.MOVE_TO_POOL, "BI"));
+    Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+      new Action(Action.Type.KILL_QUERY));
+    setupTriggers(Lists.newArrayList(moveTrigger1, killTrigger), Lists.newArrayList(moveTrigger2));
+    String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
+      " t2 on t1.under_col>=t2.under_col";
+    List<String> setCmds = new ArrayList<>();
+    setCmds.add("set hive.tez.session.events.print.summary=json");
+    setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    List<String> errCaptureExpect = new ArrayList<>();
+    errCaptureExpect.add("Workload Manager Events Summary");
+    errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+    errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+    errCaptureExpect.add("Event: MOVE Pool: BI Cluster %: 80.00");
+    errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("\"eventType\" : \"GET\"");
+    errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+    errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+    errCaptureExpect.add("\"eventType\" : \"KILL\"");
+    errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+    errCaptureExpect.add("\"name\" : \"move_big_read\"");
+    errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+    errCaptureExpect.add("\"name\" : \"move_high\"");
+    // violation in BI queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger1 + " violated");
+    // violation in ETL queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger2 + " violated");
+    // violation in BI queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+    runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
   }
 
   @Test(timeout = 60000)
@@ -125,7 +196,23 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
     setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
     String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
       " t2 on t1.under_col>=t2.under_col";
-    runQueryWithTrigger(query, null, killTrigger + " violated");
+    List<String> setCmds = new ArrayList<>();
+    setCmds.add("set hive.tez.session.events.print.summary=json");
+    setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+    List<String> errCaptureExpect = new ArrayList<>();
+    errCaptureExpect.add("Workload Manager Events Summary");
+    errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+    errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+    errCaptureExpect.add("\"eventType\" : \"GET\"");
+    errCaptureExpect.add("\"eventType\" : \"KILL\"");
+    errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+    errCaptureExpect.add("\"name\" : \"move_big_read\"");
+    errCaptureExpect.add("\"name\" : \"kill_big_read\"");
+    // violation in BI queue
+    errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+    runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 57e1803..6d48783 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -150,18 +150,18 @@ public class Context {
    */
   private Map<Integer, DestClausePrefix> insertBranchToNamePrefix = new HashMap<>();
   private Operation operation = Operation.OTHER;
-  private TriggerContext triggerContext;
+  private WmContext wmContext;
 
   public void setOperation(Operation operation) {
     this.operation = operation;
   }
 
-  public TriggerContext getTriggerContext() {
-    return triggerContext;
+  public WmContext getWmContext() {
+    return wmContext;
   }
 
-  public void setTriggerContext(final TriggerContext triggerContext) {
-    this.triggerContext = triggerContext;
+  public void setWmContext(final WmContext wmContext) {
+    this.wmContext = wmContext;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 389a1a6..4d52d74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -113,7 +113,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObje
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
@@ -732,8 +732,8 @@ public class Driver implements CommandProcessor {
     } else {
       queryStartTime = queryDisplay.getQueryStartTime();
     }
-    TriggerContext triggerContext = new TriggerContext(queryStartTime, queryId);
-    ctx.setTriggerContext(triggerContext);
+    WmContext wmContext = new WmContext(queryStartTime, queryId);
+    ctx.setWmContext(wmContext);
   }
 
   private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 0509cbc..eb64421 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -18,18 +18,21 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hive.common.util.Ref;
-
-import java.util.concurrent.TimeoutException;
-
 import org.apache.hadoop.security.token.Token;
+import org.apache.hive.common.util.Ref;
 import org.apache.tez.common.security.JobTokenIdentifier;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
 
+@JsonSerialize
 public interface AmPluginNode {
-  public static class AmPluginInfo {
+  class AmPluginInfo {
+    @JsonProperty("amPluginPort")
     public final int amPluginPort;
     public final Token<JobTokenIdentifier> amPluginToken;
     public final String amPluginTokenJobId;
+    @JsonProperty("amHost")
     public final String amHost;
 
     AmPluginInfo(String amHost, int amPluginPort,

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
index 94b189b..b16f1c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -21,13 +21,12 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KillMoveTriggerActionHandler implements TriggerActionHandler {
+public class KillMoveTriggerActionHandler implements TriggerActionHandler<WmTezSession> {
   private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class);
   private final WorkloadManager wm;
 
@@ -36,31 +35,20 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler {
   }
 
   @Override
-  public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
-    TezSessionState sessionState;
-    Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>(queriesViolated.size());
-    for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+  public void applyAction(final Map<WmTezSession, Trigger> queriesViolated) {
+    Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>();
+    Map<WmTezSession, Future<Boolean>> killFutures = new HashMap<>();
+    for (Map.Entry<WmTezSession, Trigger> entry : queriesViolated.entrySet()) {
+      WmTezSession wmTezSession = entry.getKey();
       switch (entry.getValue().getAction().getType()) {
         case KILL_QUERY:
-          sessionState = entry.getKey();
-          String queryId = sessionState.getTriggerContext().getQueryId();
-          try {
-            sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
-          } catch (HiveException e) {
-            LOG.warn("Unable to kill query {} for trigger violation");
-          }
+          Future<Boolean> killFuture = wm.applyKillSessionAsync(wmTezSession, entry.getValue().getViolationMsg());
+          killFutures.put(wmTezSession, killFuture);
           break;
         case MOVE_TO_POOL:
-          sessionState = entry.getKey();
-          if (sessionState instanceof WmTezSession) {
-            WmTezSession wmTezSession = (WmTezSession) sessionState;
-            String destPoolName = entry.getValue().getAction().getPoolName();
-            Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
-            moveFutures.put(wmTezSession, moveFuture);
-          } else {
-            throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() +
-              ". SessionId: " + sessionState.getSessionId());
-          }
+          String destPoolName = entry.getValue().getAction().getPoolName();
+          Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+          moveFutures.put(wmTezSession, moveFuture);
           break;
         default:
           throw new RuntimeException("Unsupported action: " + entry.getValue());
@@ -69,15 +57,28 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler {
 
     for (Map.Entry<WmTezSession, Future<Boolean>> entry : moveFutures.entrySet()) {
       WmTezSession wmTezSession = entry.getKey();
-      Future<Boolean> moveFuture = entry.getValue();
+      Future<Boolean> future = entry.getValue();
       try {
         // block to make sure move happened successfully
-        if (moveFuture.get()) {
+        if (future.get()) {
           LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getPoolName());
         }
       } catch (InterruptedException | ExecutionException e) {
         LOG.error("Exception while moving session {}", wmTezSession.getSessionId(), e);
       }
     }
+
+    for (Map.Entry<WmTezSession, Future<Boolean>> entry : killFutures.entrySet()) {
+      WmTezSession wmTezSession = entry.getKey();
+      Future<Boolean> future = entry.getValue();
+      try {
+        // block to make sure kill happened successfully
+        if (future.get()) {
+          LOG.info("Killed session {}", wmTezSession.getSessionId());
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Exception while killing session {}", wmTezSession.getSessionId(), e);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 8c60b6f..50d234d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Handles only Kill Action.
  */
-public class KillTriggerActionHandler implements TriggerActionHandler {
+public class KillTriggerActionHandler implements TriggerActionHandler<TezSessionState> {
   private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class);
 
   @Override
@@ -37,7 +37,7 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
       switch (entry.getValue().getAction().getType()) {
         case KILL_QUERY:
           TezSessionState sessionState = entry.getKey();
-          String queryId = sessionState.getTriggerContext().getQueryId();
+          String queryId = sessionState.getWmContext().getQueryId();
           try {
             KillQuery killQuery = sessionState.getKillQuery();
             // if kill query is null then session might have been released to pool or closed already

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 6fa3724..dd879fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -38,7 +38,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.security.auth.login.LoginException;
 
-import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -62,7 +61,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.KillQuery;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.security.Credentials;
@@ -85,6 +84,9 @@ import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
@@ -95,6 +97,7 @@ import com.google.common.cache.CacheBuilder;
 /**
  * Holds session state related to Tez
  */
+@JsonSerialize
 public class TezSessionState {
 
   protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName());
@@ -111,19 +114,24 @@ public class TezSessionState {
   private Future<TezClient> sessionFuture;
   /** Console used for user feedback during async session opening. */
   private LogHelper console;
+  @JsonProperty("sessionId")
   private String sessionId;
   private final DagUtils utils;
+  @JsonProperty("queueName")
   private String queueName;
+  @JsonProperty("defaultQueue")
   private boolean defaultQueue = false;
+  @JsonProperty("user")
   private String user;
 
   private AtomicReference<String> ownerThread = new AtomicReference<>(null);
 
   private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
   private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
+  @JsonProperty("doAsEnabled")
   private boolean doAsEnabled;
   private boolean isLegacyLlapMode;
-  private TriggerContext triggerContext;
+  private WmContext wmContext;
   private KillQuery killQuery;
 
   private static final Cache<String, String> shaCache = CacheBuilder.newBuilder().maximumSize(100).build();
@@ -852,12 +860,12 @@ public class TezSessionState {
     TezSessionPoolManager.getInstance().destroy(this);
   }
 
-  public TriggerContext getTriggerContext() {
-    return triggerContext;
+  public WmContext getWmContext() {
+    return wmContext;
   }
 
-  public void setTriggerContext(final TriggerContext triggerContext) {
-    this.triggerContext = triggerContext;
+  public void setWmContext(final WmContext wmContext) {
+    this.wmContext = wmContext;
   }
 
   public void setKillQuery(final KillQuery killQuery) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index af77f30..8795cfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryInfo;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -63,7 +61,7 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -146,8 +144,8 @@ public class TezTask extends Task<TezWork> {
         // some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext.
         // Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId.
         String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
-        TriggerContext triggerContext = new TriggerContext(System.currentTimeMillis(), queryId);
-        ctx.setTriggerContext(triggerContext);
+        WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
+        ctx.setWmContext(wmContext);
       }
 
       // Need to remove this static hack. But this is the way currently to get a session.
@@ -158,7 +156,6 @@ public class TezTask extends Task<TezWork> {
       if (session != null && !session.isOpen()) {
         LOG.warn("The session: " + session + " has not been opened");
       }
-      Set<String> desiredCounters = new HashSet<>();
       // We only need a username for UGI to use for groups; getGroups will fetch the groups
       // based on Hadoop configuration, as documented at
       // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
@@ -166,15 +163,11 @@ public class TezTask extends Task<TezWork> {
       MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
         : new MappingInput(ss.getUserName(),
             UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
-      session = WorkloadManagerFederation.getSession(
-          session, conf, mi, getWork().getLlapMode(), desiredCounters);
+      WmContext wmContext = ctx.getWmContext();
+      session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext);
 
-      TriggerContext triggerContext = ctx.getTriggerContext();
-      triggerContext.setDesiredCounters(desiredCounters);
-      LOG.info("Subscribed to counters: {} for queryId: {}",
-          desiredCounters, triggerContext.getQueryId());
+      LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId());
       ss.setTezSession(session);
-      session.setTriggerContext(triggerContext);
       try {
         // jobConf will hold all the configuration for hadoop, tez, and hive
         JobConf jobConf = utils.createConfiguration(conf);
@@ -256,12 +249,22 @@ public class TezTask extends Task<TezWork> {
         //       Currently, reopen on an attempted reuse will take care of that; we cannot tell
         //       if the session is usable until we try.
         // We return this to the pool even if it's unusable; reopen is supposed to handle this.
+        wmContext = ctx.getWmContext();
         try {
           session.returnToSessionManager();
         } catch (Exception e) {
           LOG.error("Failed to return session: {} to pool", session, e);
           throw e;
         }
+
+        if (!conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("none") &&
+          wmContext != null) {
+          if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("json")) {
+            wmContext.printJson(console);
+          } else if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("text")) {
+            wmContext.print(console);
+          }
+        }
       }
 
       if (LOG.isInfoEnabled() && counters != null
@@ -585,9 +588,9 @@ public class TezTask extends Task<TezWork> {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
         // TODO: this is temporary, need to refactor how reopen is invoked.
-        TriggerContext oldCtx = sessionState.getTriggerContext();
+        WmContext oldCtx = sessionState.getWmContext();
         sessionState = sessionState.reopen(conf, inputOutputJars);
-        sessionState.setTriggerContext(oldCtx);
+        sessionState.setWmContext(oldCtx);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {
         // we failed to submit after retrying. Destroy session and bail.

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 5821659..6414f05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.wm.Action;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,17 +46,17 @@ public class TriggerValidatorRunnable implements Runnable {
       final List<TezSessionState> sessions = sessionTriggerProvider.getSessions();
       final List<Trigger> triggers = sessionTriggerProvider.getTriggers();
       for (TezSessionState sessionState : sessions) {
-        TriggerContext triggerContext = sessionState.getTriggerContext();
-        if (triggerContext != null && !triggerContext.isQueryCompleted()
-          && !triggerContext.getCurrentCounters().isEmpty()) {
-          Map<String, Long> currentCounters = triggerContext.getCurrentCounters();
+        WmContext wmContext = sessionState.getWmContext();
+        if (wmContext != null && !wmContext.isQueryCompleted()
+          && !wmContext.getCurrentCounters().isEmpty()) {
+          Map<String, Long> currentCounters = wmContext.getCurrentCounters();
           for (Trigger currentTrigger : triggers) {
             String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName();
             // there could be interval where desired counter value is not populated by the time we make this check
             if (currentCounters.containsKey(desiredCounter)) {
               long currentCounterValue = currentCounters.get(desiredCounter);
               if (currentTrigger.apply(currentCounterValue)) {
-                String queryId = sessionState.getTriggerContext().getQueryId();
+                String queryId = sessionState.getWmContext().getQueryId();
                 if (violatedSessions.containsKey(sessionState)) {
                   // session already has a violation
                   Trigger existingTrigger = violatedSessions.get(sessionState);
@@ -84,7 +84,7 @@ public class TriggerValidatorRunnable implements Runnable {
 
           Trigger chosenTrigger = violatedSessions.get(sessionState);
           if (chosenTrigger != null) {
-            LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(),
+            LOG.info("Query: {}. {}. Applying action.", sessionState.getWmContext().getQueryId(),
               chosenTrigger.getViolationMsg());
           }
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
new file mode 100644
index 0000000..33341ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Workload Manager events at query level.
+ */
+@JsonSerialize
+public class WmEvent {
+  private static final Logger LOG = LoggerFactory.getLogger(WmEvent.class);
+  enum EventType {
+    GET, // get session
+    KILL, // kill query
+    DESTROY, // destroy session
+    RESTART, // restart session
+    RETURN, // return session back to pool
+    MOVE // move session to different pool
+  }
+
+  // snapshot of subset of wm tez session info for printing in events summary
+  @JsonSerialize
+  public static class WmTezSessionInfo {
+    @JsonProperty("sessionId")
+    private final String sessionId;
+    @JsonProperty("poolName")
+    private final String poolName;
+    @JsonProperty("clusterPercent")
+    private final double clusterPercent;
+
+    WmTezSessionInfo(WmTezSession wmTezSession) {
+      this.poolName = wmTezSession.getPoolName();
+      this.sessionId = wmTezSession.getSessionId();
+      this.clusterPercent = wmTezSession.getClusterFraction() * 100.0;
+    }
+
+    public String getPoolName() {
+      return poolName;
+    }
+
+    public String getSessionId() {
+      return sessionId;
+    }
+
+    public double getClusterPercent() {
+      return clusterPercent;
+    }
+
+    @Override
+    public String toString() {
+      return "SessionId: " + sessionId + " Pool: " + poolName +  " Cluster %: " + clusterPercent;
+    }
+  }
+
+  @JsonProperty("wmTezSessionInfo")
+  private WmTezSessionInfo wmTezSessionInfo;
+  @JsonProperty("eventStartTimestamp")
+  private long eventStartTimestamp;
+  @JsonProperty("eventEndTimestamp")
+  private long eventEndTimestamp;
+  @JsonProperty("eventType")
+  private final EventType eventType;
+  @JsonProperty("elapsedTime")
+  private long elapsedTime;
+
+  WmEvent(final EventType eventType) {
+    this.eventType = eventType;
+    this.eventStartTimestamp = System.currentTimeMillis();
+  }
+
+  public long getEventStartTimestamp() {
+    return eventStartTimestamp;
+  }
+
+  public EventType getEventType() {
+    return eventType;
+  }
+
+  public WmTezSessionInfo getWmTezSessionInfo() {
+    return wmTezSessionInfo;
+  }
+
+  public long getEventEndTimestamp() {
+    return eventEndTimestamp;
+  }
+
+  public long getElapsedTime() {
+    return elapsedTime;
+  }
+
+  public void endEvent(final WmTezSession sessionState) {
+    this.wmTezSessionInfo = new WmTezSessionInfo(sessionState);
+    this.eventEndTimestamp = System.currentTimeMillis();
+    this.elapsedTime = eventEndTimestamp - eventStartTimestamp;
+    WmContext wmContext = sessionState.getWmContext();
+    if (wmContext != null) {
+      wmContext.addWMEvent(this);
+      LOG.info("Added WMEvent: {}", this);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "EventType: " + eventType + " EventStartTimestamp: " + eventStartTimestamp + " elapsedTime: " +
+      elapsedTime + " wmTezSessionInfo:" + wmTezSessionInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index d61c531..e78ef44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -21,28 +21,40 @@ package org.apache.hadoop.hive.ql.exec.tez;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 import org.apache.hive.common.util.Ref;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
 
+@JsonSerialize
 public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
+  @JsonProperty("poolName")
   private String poolName;
+  @JsonProperty("clusterFraction")
   private double clusterFraction;
   /**
    * The reason to kill an AM. Note that this is for the entire session, not just for a query.
    * Once set, this can never be unset because you can only kill the session once.
    */
+  @JsonProperty("killReason")
   private String killReason = null;
 
   private final Object amPluginInfoLock = new Object();
+  @JsonProperty("amPluginInfo")
   private AmPluginInfo amPluginInfo = null;
-  private Integer amPluginendpointVersion =  null;
+  private Integer amPluginEndpointVersion =  null;
   private SettableFuture<WmTezSession> amRegistryFuture = null;
   private ScheduledFuture<?> timeoutTimer = null;
+  @JsonProperty("queryId")
   private String queryId;
+  private SettableFuture<Boolean> returnFuture = null;
 
   private final WorkloadManager wmParent;
 
@@ -99,12 +111,12 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     synchronized (amPluginInfoLock) {
       // Ignore the outdated updates; for the same version, ignore non-null updates because
       // we assume that removal is the last thing that happens for any given version.
-      if ((amPluginendpointVersion != null) && ((amPluginendpointVersion > ephSeqVersion)
-          || (amPluginendpointVersion == ephSeqVersion && info != null))) {
+      if ((amPluginEndpointVersion != null) && ((amPluginEndpointVersion > ephSeqVersion)
+          || (amPluginEndpointVersion == ephSeqVersion && info != null))) {
         LOG.info("Ignoring an outdated info update {}: {}", ephSeqVersion, si);
         return;
       }
-      this.amPluginendpointVersion = ephSeqVersion;
+      this.amPluginEndpointVersion = ephSeqVersion;
       this.amPluginInfo = info;
       if (info != null) {
         // Only update someone waiting for info if we have the info.
@@ -123,7 +135,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   @Override
   public AmPluginInfo getAmPluginInfo(Ref<Integer> version) {
     synchronized (amPluginInfoLock) {
-      version.value = amPluginendpointVersion;
+      version.value = amPluginEndpointVersion;
       return amPluginInfo;
     }
   }
@@ -132,7 +144,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     this.poolName = poolName;
   }
 
-  String getPoolName() {
+  public String getPoolName() {
     return poolName;
   }
 
@@ -145,7 +157,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     this.clusterFraction = 0f;
   }
 
-  double getClusterFraction() {
+  public double getClusterFraction() {
     return this.clusterFraction;
   }
 
@@ -235,6 +247,20 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     return this.queryId;
   }
 
+  void createAndSetReturnFuture() {
+    this.returnFuture = SettableFuture.create();
+    if (getWmContext() != null) {
+      getWmContext().setReturnEventFuture(returnFuture);
+    }
+  }
+
+  void resolveReturnFuture() {
+    if (returnFuture != null) {
+      returnFuture.set(true);
+      returnFuture = null;
+    }
+  }
+
   @Override
   public String toString() {
     return super.toString() +  ", WM state poolName=" + poolName + ", clusterFraction="

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index ecdcf12..dbdbbf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,20 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.collect.Lists;
-
-import java.util.concurrent.ExecutionException;
-
-import java.util.Collection;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -42,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -50,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -68,11 +58,24 @@ import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hive.common.util.Ref;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 
 /** Workload management entry point for HS2.
  * Note on how this class operates.
@@ -92,6 +95,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private static final char POOL_SEPARATOR = '.';
   private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
 
+  private final ObjectMapper objectMapper;
   // Various final services, configs, etc.
   private final HiveConf conf;
   private final TezSessionPool<WmTezSession> tezAmPool;
@@ -112,6 +116,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private Map<String, PoolState> pools;
   private String rpName, defaultPool; // For information only.
   private int totalQueryParallelism;
+
   /**
    * The queries being killed. This is used to sync between the background kill finishing and the
    * query finishing and user returning the sessions, which can happen in separate iterations
@@ -211,6 +216,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     wmThread.start();
 
     updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied.
+
+    objectMapper = new ObjectMapper();
+    objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+    // serialize json based on field annotations only
+    objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
+      .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
+      .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
   }
 
   private static int determineQueryParallelism(WMFullResourcePlan plan) {
@@ -393,10 +405,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         KillQuery kq = toKill.getKillQuery();
         try {
           if (kq != null && queryId != null) {
+            WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
             LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
             try {
               kq.killQuery(queryId, reason);
               addKillQueryResult(toKill, true);
+              killCtx.killSessionFuture.set(true);
+              wmEvent.endEvent(toKill);
               LOG.debug("Killed " + queryId);
               return;
             } catch (HiveException ex) {
@@ -423,8 +438,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       toRestart.setQueryId(null);
       workPool.submit(() -> {
         try {
+          WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART);
           // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
           tezAmPool.replaceSession(toRestart, false, null);
+          wmEvent.endEvent(toRestart);
         } catch (Exception ex) {
           LOG.error("Failed to restart an old session; ignoring", ex);
         }
@@ -437,7 +454,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       LOG.info("Closing {} without restart", toDestroy);
       workPool.submit(() -> {
         try {
+          WmEvent wmEvent = new WmEvent(WmEvent.EventType.DESTROY);
           toDestroy.close(false);
+          wmEvent.endEvent(toDestroy);
         } catch (Exception ex) {
           LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
         }
@@ -513,9 +532,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           e, sessionToReturn, poolsToRedistribute, true);
       switch (rr) {
       case OK:
+        WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
         boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
         if (!wasReturned) {
           syncWork.toDestroyNoRestart.add(sessionToReturn);
+        } else {
+          if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) {
+            sessionToReturn.resolveReturnFuture();
+          }
+          wmEvent.endEvent(sessionToReturn);
         }
         break;
       case NOT_FOUND:
@@ -563,8 +588,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // We could consider delaying the move (when destination capacity is full) until there is claim in src pool.
     // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
     // as possible
+    Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
     for (MoveSession moveSession : e.moveSessions) {
-      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse);
+      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, recordMoveEvents);
     }
     e.moveSessions.clear();
 
@@ -590,13 +616,21 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       case OK: {
         iter.remove();
         LOG.debug("Kill query succeeded; returning to the pool: {}", ctx.session);
+        ctx.killSessionFuture.set(true);
+        WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
         if (!tezAmPool.returnSessionAsync(ctx.session)) {
           syncWork.toDestroyNoRestart.add(ctx.session);
+        } else {
+          if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) {
+            ctx.session.resolveReturnFuture();
+          }
+          wmEvent.endEvent(ctx.session);
         }
         break;
       }
       case RESTART_REQUIRED: {
         iter.remove();
+        ctx.killSessionFuture.set(true);
         LOG.debug("Kill query failed; restarting: {}", ctx.session);
         // Note: we assume here the session, before we resolve killQuery result here, is still
         //       "in use". That is because all the user ops above like return, reopen, etc.
@@ -620,7 +654,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       if (LOG.isDebugEnabled()) {
         LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
       }
-      processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
+      processPoolChangesOnMasterThread(poolName, hasRequeues);
     }
 
 
@@ -631,7 +665,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
     }
 
-    // 13. Notify tests and global async ops.
+    // 13. To record move events, we need to cluster fraction updates that happens at step 11.
+    for (Map.Entry<WmTezSession, WmEvent> entry : recordMoveEvents.entrySet()) {
+      entry.getValue().endEvent(entry.getKey());
+    }
+
+    // 14. Notify tests and global async ops.
     if (e.dumpStateFuture != null) {
       List<String> result = new ArrayList<>();
       result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -676,11 +715,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork,
-      Set<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse) {
+  private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
+    final WmThreadSyncWork syncWork,
+    final HashSet<String> poolsToRedistribute,
+    final Map<WmTezSession, GetRequest> toReuse,
+    final Map<WmTezSession, WmEvent> recordMoveEvents) {
     String destPoolName = moveSession.destPool;
     LOG.info("Handling move session event: {}", moveSession);
     if (validMove(moveSession.srcSession, destPoolName)) {
+      WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
       // remove from src pool
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
           moveSession.srcSession, poolsToRedistribute, true);
@@ -692,15 +735,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
               moveSession.srcSession, destPoolName, poolsToRedistribute);
           if (added != null && added) {
             moveSession.future.set(true);
+            recordMoveEvents.put(moveSession.srcSession, moveEvent);
             return;
           } else {
             LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
           }
         } else {
           WmTezSession session = moveSession.srcSession;
-          resetRemovedSessionToKill(session, toReuse);
-          syncWork.toKillQuery.put(session, new KillQueryContext(session, "Destination pool "
-              + destPoolName + " is full. Killing query."));
+          KillQueryContext killQueryContext = new KillQueryContext(session, "Destination pool " + destPoolName +
+            " is full. Killing query.");
+          resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
         }
       } else {
         LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -808,7 +852,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     case OK:
       // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
       PoolState pool = pools.get(poolName);
-      SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId());
+      SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext());
       // We have just removed the session from the same pool, so don't check concurrency here.
       pool.initializingSessions.add(sw);
       ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
@@ -918,6 +962,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         totalQueryParallelism += qp;
       }
     }
+    // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and
+    // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching
+    // triggers to specific pools.
+    // For usability,
+    // Provide a way for triggers sharing/inheritance possibly with following modes
+    // ONLY - only to pool
+    // INHERIT - child pools inherit from parent
+    // GLOBAL - all pools inherit
     if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) {
       Map<String, Trigger> triggers = new HashMap<>();
       for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) {
@@ -1036,8 +1088,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   }
 
 
-  private void processPoolChangesOnMasterThread(
-    String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception {
+  private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues) throws Exception {
     PoolState pool = pools.get(poolName);
     if (pool == null) return; // Might be from before the new resource plan.
 
@@ -1058,7 +1109,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // Note that in theory, we are guaranteed to have a session waiting for us here, but
       // the expiration, failures, etc. may cause one to be missing pending restart.
       // See SessionInitContext javadoc.
-      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId);
+      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId,
+        queueReq.wmContext);
       ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
       Futures.addCallback(getFuture, sw);
       // It is possible that all the async methods returned on the same thread because the
@@ -1097,8 +1149,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       assert isOk || rr == RemoveSessionResult.IGNORE;
       if (!isOk) return;
     }
+    WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
     if (!tezAmPool.returnSessionAsync(session)) {
       syncWork.toDestroyNoRestart.add(session);
+    } else {
+      if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) {
+        session.resolveReturnFuture();
+      }
+      wmEvent.endEvent(session);
     }
   }
 
@@ -1159,10 +1217,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     PoolState destPool = pools.get(destPoolName);
     if (destPool != null && destPool.sessions.add(session)) {
       session.setPoolName(destPoolName);
+      updateTriggers(session);
       poolsToRedistribute.add(destPoolName);
       return true;
     }
-    LOG.error("Session {} was not not added to pool {}", session, destPoolName);
+    LOG.error("Session {} was not added to pool {}", session, destPoolName);
     return null;
   }
 
@@ -1188,7 +1247,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return applyRpFuture;
   }
 
-  public Future<Boolean> applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) {
+  Future<Boolean> applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) {
     currentLock.lock();
     MoveSession moveSession;
     try {
@@ -1202,28 +1261,42 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return moveSession.future;
   }
 
+  Future<Boolean> applyKillSessionAsync(WmTezSession wmTezSession, String killReason) {
+    KillQueryContext killQueryContext;
+    currentLock.lock();
+    try {
+      killQueryContext = new KillQueryContext(wmTezSession, killReason);
+      resetAndQueueKill(syncWork.toKillQuery, killQueryContext, current.toReuse);
+      LOG.info("Queued session for kill: {}", killQueryContext.session);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return killQueryContext.killSessionFuture;
+  }
+
   private final static class GetRequest {
-    public static final Comparator<GetRequest> ORDER_COMPARATOR = new Comparator<GetRequest>() {
-      @Override
-      public int compare(GetRequest o1, GetRequest o2) {
-        if (o1.order == o2.order) return 0;
-        return o1.order < o2.order ? -1 : 1;
-      }
+    public static final Comparator<GetRequest> ORDER_COMPARATOR = (o1, o2) -> {
+      if (o1.order == o2.order) return 0;
+      return o1.order < o2.order ? -1 : 1;
     };
     private final long order;
     private final MappingInput mappingInput;
     private final SettableFuture<WmTezSession> future;
     private WmTezSession sessionToReuse;
     private final String queryId;
+    private final WmContext wmContext;
 
     private GetRequest(MappingInput mappingInput, String queryId,
-        SettableFuture<WmTezSession> future,  WmTezSession sessionToReuse, long order) {
+      SettableFuture<WmTezSession> future, WmTezSession sessionToReuse, long order,
+      final WmContext wmContext) {
       assert mappingInput != null;
       this.mappingInput = mappingInput;
       this.queryId = queryId;
       this.future = future;
       this.sessionToReuse = sessionToReuse;
       this.order = order;
+      this.wmContext = wmContext;
     }
 
     @Override
@@ -1232,15 +1305,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  public TezSessionState getSession(
-      TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
+  public WmTezSession getSession(
+    TezSessionState session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception {
+    WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
     // Note: not actually used for pool sessions; verify some things like doAs are not set.
     validateConfig(conf);
     String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
     SettableFuture<WmTezSession> future = SettableFuture.create();
     WmTezSession wmSession = checkSessionForReuse(session);
     GetRequest req = new GetRequest(
-        input, queryId, future, wmSession, getRequestVersion.incrementAndGet());
+        input, queryId, future, wmSession, getRequestVersion.incrementAndGet(), wmContext);
     currentLock.lock();
     try {
       current.getRequests.add(req);
@@ -1252,7 +1326,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     } finally {
       currentLock.unlock();
     }
-    return future.get();
+    WmTezSession sessionState = future.get();
+    wmEvent.endEvent(sessionState);
+    return sessionState;
   }
 
   @Override
@@ -1283,6 +1359,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     resetGlobalTezSession(wmTezSession);
     currentLock.lock();
     try {
+      wmTezSession.createAndSetReturnFuture();
       current.toReturn.add(wmTezSession);
       notifyWmThreadUnderLock();
     } finally {
@@ -1388,7 +1465,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (sessionConf == null) {
       LOG.warn("Session configuration is null for " + wmTezSession);
       sessionConf = new HiveConf(conf, WorkloadManager.class);
-
     }
     // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
     //       with additional files will have to wait until HIVE-17827 is unfucked, because it's
@@ -1519,23 +1595,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return conf;
   }
 
-  public List<String> getTriggerCounterNames(final TezSessionState session) {
-    if (session instanceof WmTezSession) {
-      WmTezSession wmTezSession = (WmTezSession) session;
-      String poolName = wmTezSession.getPoolName();
-      PoolState poolState = pools.get(poolName);
-      if (poolState != null) {
-        List<String> counterNames = new ArrayList<>();
-        List<Trigger> triggers = poolState.getTriggers();
-        if (triggers != null) {
-          for (Trigger trigger : triggers) {
-            counterNames.add(trigger.getExpression().getCounterLimit().getName());
-          }
-        }
-        return counterNames;
-      }
+  void updateTriggers(final WmTezSession session) {
+    WmContext wmContext = session.getWmContext();
+    String poolName = session.getPoolName();
+    PoolState poolState = pools.get(poolName);
+    if (wmContext != null && poolState != null) {
+      wmContext.addTriggers(poolState.getTriggers());
+      LOG.info("Subscribed to counters: {}", wmContext.getSubscribedCounters());
     }
-    return null;
   }
 
   @Override
@@ -1613,6 +1680,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       return sessions;
     }
 
+    public LinkedList<SessionInitContext> getInitializingSessions() {
+      return initializingSessions;
+    }
+
     @Override
     public String toString() {
       return "[" + fullName + ", query parallelism " + queryParallelism
@@ -1625,8 +1696,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         IdentityHashMap<WmTezSession, GetRequest> toReuse,
         Map<WmTezSession, KillQueryContext> toKill) {
       for (WmTezSession sessionToKill : sessions) {
-        resetRemovedSessionToKill(sessionToKill, toReuse);
-        toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
+        resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
       }
       sessions.clear();
       for (SessionInitContext initCtx : initializingSessions) {
@@ -1636,8 +1706,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (sessionToKill == null) {
           continue; // Async op in progress; the callback will take care of this.
         }
-        resetRemovedSessionToKill(sessionToKill, toReuse);
-        toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
+        resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
       }
       initializingSessions.clear();
     }
@@ -1671,12 +1740,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private SettableFuture<WmTezSession> future;
     private SessionInitState state;
     private String cancelReason;
+    private WmContext wmContext;
 
-    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId) {
+    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId,
+      final WmContext wmContext) {
       this.state = SessionInitState.GETTING;
       this.future = future;
       this.poolName = poolName;
       this.queryId = queryId;
+      this.wmContext = wmContext;
     }
 
     @Override
@@ -1693,6 +1765,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           session.setPoolName(poolName);
           session.setQueueName(yarnQueue);
           session.setQueryId(queryId);
+          session.setWmContext(wmContext);
           this.session = session;
           this.state = SessionInitState.WAITING_FOR_REGISTRY;
           break;
@@ -1740,6 +1813,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         session.setPoolName(null);
         session.setClusterFraction(0f);
         session.setQueryId(null);
+        session.setWmContext(null);
         tezAmPool.returnSession(session);
         break;
       }
@@ -1858,16 +1932,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    * like the session even before we kill it, or the kill fails and the user is happily computing
    * away. This class is to collect and make sense of the state around all this.
    */
-  private static final class KillQueryContext {
+  static final class KillQueryContext {
+    private SettableFuture<Boolean> killSessionFuture;
     private final String reason;
     private final WmTezSession session;
     // Note: all the fields are only modified by master thread.
     private boolean isUserDone = false, isKillDone = false,
         hasKillFailed = false, hasUserFailed = false;
 
-    public KillQueryContext(WmTezSession session, String reason) {
+    KillQueryContext(WmTezSession session, String reason) {
       this.session = session;
       this.reason = reason;
+      this.killSessionFuture = SettableFuture.create();
     }
 
     private void handleKillQueryCallback(boolean hasFailed) {
@@ -1912,10 +1988,36 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  private static void resetRemovedSessionToKill(
-      WmTezSession sessionToKill, Map<WmTezSession, GetRequest> toReuse) {
-    sessionToKill.clearWm();
-    GetRequest req = toReuse.remove(sessionToKill);
+  private static void resetRemovedSessionToKill(Map<WmTezSession, KillQueryContext> toKillQuery,
+    KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
+    toKillQuery.put(killQueryContext.session, killQueryContext);
+    killQueryContext.session.clearWm();
+    GetRequest req = toReuse.remove(killQueryContext.session);
+    if (req != null) {
+      req.sessionToReuse = null;
+    }
+  }
+
+  private void resetAndQueueKill(Map<WmTezSession, KillQueryContext> toKillQuery,
+    KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
+
+    WmTezSession toKill = killQueryContext.session;
+    toKillQuery.put(toKill, killQueryContext);
+
+    // The way this works is, a session in WM pool will move back to tez AM pool on a kill and will get
+    // reassigned back to WM pool on GetRequest based on user pool mapping. Only if we remove the session from active
+    // sessions list of its WM pool will the queue'd GetRequest be processed
+    String poolName = toKill.getPoolName();
+    if (poolName != null) {
+      PoolState poolState = pools.get(poolName);
+      if (poolState != null) {
+        poolState.getSessions().remove(toKill);
+        poolState.getInitializingSessions().remove(toKill);
+      }
+    }
+
+    toKill.clearWm();
+    GetRequest req = toReuse.remove(toKill);
     if (req != null) {
       req.sessionToReuse = null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index 0a9fa72..9d56204 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,51 +16,57 @@
  * limitations under the License.
  */package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.slf4j.LoggerFactory;
-
-import org.slf4j.Logger;
-
+import java.util.HashSet;
 import java.util.Set;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WorkloadManagerFederation {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class);
 
   public static TezSessionState getSession(TezSessionState session, HiveConf conf,
-      MappingInput input, boolean isUnmanagedLlapMode, Set<String> desiredCounters) throws Exception {
+    MappingInput input, boolean isUnmanagedLlapMode, final WmContext wmContext) throws Exception {
+    Set<String> desiredCounters = new HashSet<>();
     // 1. If WM is not present just go to unmanaged.
     WorkloadManager wm = WorkloadManager.getInstance();
     if (wm == null) {
       LOG.debug("Using unmanaged session - WM is not initialized");
-      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
     }
     // 2. We will ask WM for a preliminary mapping. This allows us to escape to the unmanaged path
     //    quickly in the common case. It's still possible that resource plan will be updated and
     //    our preliminary mapping won't work out. We'll handle that below.
     if (!wm.isManaged(input)) {
       LOG.info("Using unmanaged session - no mapping for " + input);
-      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
     }
 
     // 3. Finally, try WM.
     try {
       // Note: this may just block to wait for a session based on parallelism.
       LOG.info("Getting a WM session for " + input);
-      TezSessionState result = wm.getSession(session, input, conf);
-      desiredCounters.addAll(wm.getTriggerCounterNames(result));
+      WmTezSession result = wm.getSession(session, input, conf, wmContext);
+      result.setWmContext(wmContext);
+      wm.updateTriggers(result);
       return result;
     } catch (WorkloadManager.NoPoolMappingException ex) {
       LOG.info("NoPoolMappingException thrown. Getting an un-managed session..");
-      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+      return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
     }
   }
 
   private static TezSessionState getUnmanagedSession(
-      TezSessionState session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode) throws Exception {
+    TezSessionState session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode,
+    final WmContext wmContext) throws Exception {
     TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
     session = pm.getSession(session, conf, false, isWorkLlapNode);
     desiredCounters.addAll(pm.getTriggerCounterNames());
+    wmContext.setSubscribedCounters(desiredCounters);
+    session.setWmContext(wmContext);
     return session;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
index 5bb6bf1..8414c73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -19,6 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.ql.session.SessionState;
 
-interface PrintSummary {
+public interface PrintSummary {
   void print(SessionState.LogHelper console);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 3dd4b31..9726af1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.wm.TimeCounterLimit;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.hive.ql.wm.VertexCounterLimit;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.tez.common.counters.CounterGroup;
@@ -156,7 +156,7 @@ public class TezJobMonitor {
     boolean running = false;
 
     long checkInterval = MIN_CHECK_INTERVAL;
-    TriggerContext triggerContext = null;
+    WmContext wmContext = null;
     while (true) {
 
       try {
@@ -167,12 +167,12 @@ public class TezJobMonitor {
         status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval);
         TezCounters dagCounters = status.getDAGCounters();
         vertexProgressMap = status.getVertexProgress();
-        triggerContext = context.getTriggerContext();
-        if (dagCounters != null && triggerContext != null) {
-          Set<String> desiredCounters = triggerContext.getDesiredCounters();
+        wmContext = context.getWmContext();
+        if (dagCounters != null && wmContext != null) {
+          Set<String> desiredCounters = wmContext.getSubscribedCounters();
           if (desiredCounters != null && !desiredCounters.isEmpty()) {
             Map<String, Long> currentCounters = getCounterValues(dagCounters, vertexProgressMap, desiredCounters, done);
-            triggerContext.setCurrentCounters(currentCounters);
+            wmContext.setCurrentCounters(currentCounters);
           }
         }
         DAGStatus.State state = status.getState();
@@ -234,8 +234,8 @@ public class TezJobMonitor {
               break;
           }
         }
-        if (triggerContext != null && done) {
-          triggerContext.setQueryCompleted(true);
+        if (wmContext != null && done) {
+          wmContext.setQueryCompleted(true);
         }
       } catch (Exception e) {
         console.printInfo("Exception: " + e.getMessage());
@@ -263,13 +263,13 @@ public class TezJobMonitor {
         } else {
           console.printInfo("Retrying...");
         }
-        if (triggerContext != null && done) {
-          triggerContext.setQueryCompleted(true);
+        if (wmContext != null && done) {
+          wmContext.setQueryCompleted(true);
         }
       } finally {
         if (done) {
-          if (triggerContext != null && done) {
-            triggerContext.setQueryCompleted(true);
+          if (wmContext != null && done) {
+            wmContext.setQueryCompleted(true);
           }
           if (rc != 0 && status != null) {
             for (String diag : status.getDiagnostics()) {
@@ -324,7 +324,7 @@ public class TezJobMonitor {
     if (!done) {
       counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name();
       if (desiredCounters.contains(counterName)) {
-        updatedCounters.put(counterName, context.getTriggerContext().getElapsedTime());
+        updatedCounters.put(counterName, context.getWmContext().getElapsedTime());
       }
 
       counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name();
@@ -351,6 +351,7 @@ public class TezJobMonitor {
         new LLAPioSummary(progressMap, dagClient).print(console);
         new FSCountersSummary(progressMap, dagClient).print(console);
       }
+
       console.printInfo("");
     }
   }