You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/12/06 08:31:00 UTC
[2/2] hive git commit: HIVE-18088: Add WM event traces at query level
for debugging (Prasanth Jayachandran reviewed by Sergey Shelukhin)
HIVE-18088: Add WM event traces at query level for debugging (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3060b30
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3060b30
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3060b30
Branch: refs/heads/master
Commit: a3060b30705d1bd55c6be5357f7575534c84e6b0
Parents: 0d83233
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Dec 6 00:30:42 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Dec 6 00:30:42 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
itests/hive-unit/pom.xml | 18 +-
.../hive/jdbc/AbstractJdbcTriggersTest.java | 73 ++++--
.../jdbc/TestTriggersMoveWorkloadManager.java | 109 ++++++++-
.../java/org/apache/hadoop/hive/ql/Context.java | 12 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 6 +-
.../hadoop/hive/ql/exec/tez/AmPluginNode.java | 13 +-
.../exec/tez/KillMoveTriggerActionHandler.java | 51 ++--
.../ql/exec/tez/KillTriggerActionHandler.java | 4 +-
.../hive/ql/exec/tez/TezSessionState.java | 22 +-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 33 +--
.../ql/exec/tez/TriggerValidatorRunnable.java | 14 +-
.../apache/hadoop/hive/ql/exec/tez/WmEvent.java | 127 ++++++++++
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 40 +++-
.../hive/ql/exec/tez/WorkloadManager.java | 234 +++++++++++++------
.../ql/exec/tez/WorkloadManagerFederation.java | 30 ++-
.../ql/exec/tez/monitoring/PrintSummary.java | 2 +-
.../ql/exec/tez/monitoring/TezJobMonitor.java | 27 +--
.../hooks/PostExecWMEventsSummaryPrinter.java | 62 +++++
.../org/apache/hadoop/hive/ql/wm/Trigger.java | 3 +
.../hadoop/hive/ql/wm/TriggerActionHandler.java | 4 +-
.../hadoop/hive/ql/wm/TriggerContext.java | 74 ------
.../org/apache/hadoop/hive/ql/wm/WmContext.java | 233 ++++++++++++++++++
.../hive/ql/exec/tez/TestWorkloadManager.java | 96 ++++----
24 files changed, 964 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3be5a8d..75b7707 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3052,6 +3052,10 @@ public class HiveConf extends Configuration {
"hive.tez.exec.print.summary",
false,
"Display breakdown of execution steps, for every query executed by the shell."),
+ TEZ_SESSION_EVENTS_SUMMARY(
+ "hive.tez.session.events.print.summary",
+ "none", new StringSet("none", "text", "json"),
+ "Display summary of all tez sessions related events in text or json format"),
TEZ_EXEC_INPLACE_PROGRESS(
"hive.tez.exec.inplace.progress",
true,
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index ea5b7b9..3a435a8 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -361,7 +361,23 @@
<version>${tez.version}</version>
<scope>test</scope>
<exclusions>
- <exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 235e6c3..62ee66f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.PrintStream;
import java.net.URL;
import java.sql.Connection;
import java.sql.SQLException;
@@ -103,7 +105,7 @@ public abstract class AbstractJdbcTriggersTest {
}
}
- void createSleepUDF() throws SQLException {
+ private void createSleepUDF() throws SQLException {
String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
Connection con = hs2Conn;
Statement stmt = con.createStatement();
@@ -112,40 +114,65 @@ public abstract class AbstractJdbcTriggersTest {
}
void runQueryWithTrigger(final String query, final List<String> setCmds,
- final String expect)
+ final String expect) throws Exception {
+ runQueryWithTrigger(query, setCmds, expect, null);
+ }
+
+ void runQueryWithTrigger(final String query, final List<String> setCmds,
+ final String expect, final List<String> errCaptureExpect)
throws Exception {
Connection con = hs2Conn;
TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
createSleepUDF();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(baos)); // capture stderr
final Statement selStmt = con.createStatement();
final Throwable[] throwable = new Throwable[1];
- Thread queryThread = new Thread(() -> {
- try {
- if (setCmds != null) {
- for (String setCmd : setCmds) {
- selStmt.execute(setCmd);
+ try {
+ Thread queryThread = new Thread(() -> {
+ try {
+ if (setCmds != null) {
+ for (String setCmd : setCmds) {
+ selStmt.execute(setCmd);
+ }
}
+ selStmt.execute(query);
+ } catch (SQLException e) {
+ throwable[0] = e;
+ }
+ });
+ queryThread.start();
+
+ queryThread.join();
+ selStmt.close();
+
+ if (expect == null) {
+ assertNull("Expected query to succeed", throwable[0]);
+ } else {
+ assertNotNull("Expected non-null throwable", throwable[0]);
+ assertEquals(SQLException.class, throwable[0].getClass());
+ assertTrue(expect + " is not contained in " + throwable[0].getMessage(),
+ throwable[0].getMessage().contains(expect));
+ }
+
+ if (errCaptureExpect != null && !errCaptureExpect.isEmpty()) {
+ // failure hooks are run after HiveStatement is closed. wait sometime for failure hook to execute
+ String stdErrStr = "";
+ while (!stdErrStr.contains(errCaptureExpect.get(0))) {
+ baos.flush();
+ stdErrStr = baos.toString();
+ Thread.sleep(500);
+ }
+ for (String errExpect : errCaptureExpect) {
+ assertTrue("'" + errExpect + "' expected in STDERR capture, but not found.", stdErrStr.contains(errExpect));
}
- selStmt.execute(query);
- } catch (SQLException e) {
- throwable[0] = e;
}
- });
- queryThread.start();
-
- queryThread.join();
- selStmt.close();
-
- if (expect == null) {
- assertNull("Expected query to succeed", throwable[0]);
- } else {
- assertNotNull("Expected non-null throwable", throwable[0]);
- assertEquals(SQLException.class, throwable[0].getClass());
- assertTrue(expect + " is not contained in " + throwable[0].getMessage(),
- throwable[0].getMessage().contains(expect));
+ } finally {
+ baos.close();
}
+
}
abstract void setupTriggers(final List<Trigger> triggers) throws Exception;
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
index a983855..74ca958 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java
@@ -18,16 +18,9 @@ package org.apache.hive.jdbc;
import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan;
import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import java.io.File;
import java.net.URL;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -94,10 +87,30 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
new Action(Action.Type.MOVE_TO_POOL, "ETL"));
Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
new Action(Action.Type.KILL_QUERY));
- setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger));
+ setupTriggers(Lists.newArrayList(moveTrigger), Lists.newArrayList(killTrigger));
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
- runQueryWithTrigger(query, null, killTrigger + " violated");
+ List<String> setCmds = new ArrayList<>();
+ setCmds.add("set hive.tez.session.events.print.summary=json");
+ setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ List<String> errCaptureExpect = new ArrayList<>();
+ errCaptureExpect.add("Workload Manager Events Summary");
+ errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+ errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+ errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("\"eventType\" : \"GET\"");
+ errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+ errCaptureExpect.add("\"eventType\" : \"KILL\"");
+ errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+ errCaptureExpect.add("\"name\" : \"slow_query_move\"");
+ errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+ // violation in BI queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger + " violated");
+ // violation in ETL queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+ runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
}
@Test(timeout = 60000)
@@ -111,7 +124,65 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col==t2.under_col";
- runQueryWithTrigger(query, null, null);
+ List<String> setCmds = new ArrayList<>();
+ setCmds.add("set hive.tez.session.events.print.summary=json");
+ setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ List<String> errCaptureExpect = new ArrayList<>();
+ errCaptureExpect.add("Workload Manager Events Summary");
+ errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+ errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+ errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("\"eventType\" : \"GET\"");
+ errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+ errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+ errCaptureExpect.add("\"name\" : \"move_big_read\"");
+ errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+ // violation in BI queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger + " violated");
+ runQueryWithTrigger(query, setCmds, null, errCaptureExpect);
+ }
+
+ @Test(timeout = 60000)
+ public void testTriggerMoveBackKill() throws Exception {
+ Expression moveExpression1 = ExpressionFactory.fromString("HDFS_BYTES_READ > 100");
+ Expression moveExpression2 = ExpressionFactory.fromString("SHUFFLE_BYTES > 200");
+ Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 2000");
+ Trigger moveTrigger1 = new ExecutionTrigger("move_big_read", moveExpression1,
+ new Action(Action.Type.MOVE_TO_POOL, "ETL"));
+ Trigger moveTrigger2 = new ExecutionTrigger("move_high", moveExpression2,
+ new Action(Action.Type.MOVE_TO_POOL, "BI"));
+ Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression,
+ new Action(Action.Type.KILL_QUERY));
+ setupTriggers(Lists.newArrayList(moveTrigger1, killTrigger), Lists.newArrayList(moveTrigger2));
+ String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName +
+ " t2 on t1.under_col>=t2.under_col";
+ List<String> setCmds = new ArrayList<>();
+ setCmds.add("set hive.tez.session.events.print.summary=json");
+ setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ List<String> errCaptureExpect = new ArrayList<>();
+ errCaptureExpect.add("Workload Manager Events Summary");
+ errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+ errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00");
+ errCaptureExpect.add("Event: MOVE Pool: BI Cluster %: 80.00");
+ errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("\"eventType\" : \"GET\"");
+ errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+ errCaptureExpect.add("\"eventType\" : \"MOVE\"");
+ errCaptureExpect.add("\"eventType\" : \"KILL\"");
+ errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+ errCaptureExpect.add("\"name\" : \"move_big_read\"");
+ errCaptureExpect.add("\"name\" : \"slow_query_kill\"");
+ errCaptureExpect.add("\"name\" : \"move_high\"");
+ // violation in BI queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger1 + " violated");
+ // violation in ETL queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger2 + " violated");
+ // violation in BI queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+ runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
}
@Test(timeout = 60000)
@@ -125,7 +196,23 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest {
setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList());
String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col>=t2.under_col";
- runQueryWithTrigger(query, null, killTrigger + " violated");
+ List<String> setCmds = new ArrayList<>();
+ setCmds.add("set hive.tez.session.events.print.summary=json");
+ setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter");
+ List<String> errCaptureExpect = new ArrayList<>();
+ errCaptureExpect.add("Workload Manager Events Summary");
+ errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00");
+ errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00");
+ errCaptureExpect.add("\"eventType\" : \"GET\"");
+ errCaptureExpect.add("\"eventType\" : \"KILL\"");
+ errCaptureExpect.add("\"eventType\" : \"RETURN\"");
+ errCaptureExpect.add("\"name\" : \"move_big_read\"");
+ errCaptureExpect.add("\"name\" : \"kill_big_read\"");
+ // violation in BI queue
+ errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated");
+ runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 57e1803..6d48783 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -150,18 +150,18 @@ public class Context {
*/
private Map<Integer, DestClausePrefix> insertBranchToNamePrefix = new HashMap<>();
private Operation operation = Operation.OTHER;
- private TriggerContext triggerContext;
+ private WmContext wmContext;
public void setOperation(Operation operation) {
this.operation = operation;
}
- public TriggerContext getTriggerContext() {
- return triggerContext;
+ public WmContext getWmContext() {
+ return wmContext;
}
- public void setTriggerContext(final TriggerContext triggerContext) {
- this.triggerContext = triggerContext;
+ public void setWmContext(final WmContext wmContext) {
+ this.wmContext = wmContext;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 389a1a6..4d52d74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -113,7 +113,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObje
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
@@ -732,8 +732,8 @@ public class Driver implements CommandProcessor {
} else {
queryStartTime = queryDisplay.getQueryStartTime();
}
- TriggerContext triggerContext = new TriggerContext(queryStartTime, queryId);
- ctx.setTriggerContext(triggerContext);
+ WmContext wmContext = new WmContext(queryStartTime, queryId);
+ ctx.setWmContext(wmContext);
}
private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 0509cbc..eb64421 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -18,18 +18,21 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.hive.common.util.Ref;
-
-import java.util.concurrent.TimeoutException;
-
import org.apache.hadoop.security.token.Token;
+import org.apache.hive.common.util.Ref;
import org.apache.tez.common.security.JobTokenIdentifier;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+@JsonSerialize
public interface AmPluginNode {
- public static class AmPluginInfo {
+ class AmPluginInfo {
+ @JsonProperty("amPluginPort")
public final int amPluginPort;
public final Token<JobTokenIdentifier> amPluginToken;
public final String amPluginTokenJobId;
+ @JsonProperty("amHost")
public final String amHost;
AmPluginInfo(String amHost, int amPluginPort,
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
index 94b189b..b16f1c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -21,13 +21,12 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KillMoveTriggerActionHandler implements TriggerActionHandler {
+public class KillMoveTriggerActionHandler implements TriggerActionHandler<WmTezSession> {
private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class);
private final WorkloadManager wm;
@@ -36,31 +35,20 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler {
}
@Override
- public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
- TezSessionState sessionState;
- Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>(queriesViolated.size());
- for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
+ public void applyAction(final Map<WmTezSession, Trigger> queriesViolated) {
+ Map<WmTezSession, Future<Boolean>> moveFutures = new HashMap<>();
+ Map<WmTezSession, Future<Boolean>> killFutures = new HashMap<>();
+ for (Map.Entry<WmTezSession, Trigger> entry : queriesViolated.entrySet()) {
+ WmTezSession wmTezSession = entry.getKey();
switch (entry.getValue().getAction().getType()) {
case KILL_QUERY:
- sessionState = entry.getKey();
- String queryId = sessionState.getTriggerContext().getQueryId();
- try {
- sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg());
- } catch (HiveException e) {
- LOG.warn("Unable to kill query {} for trigger violation");
- }
+ Future<Boolean> killFuture = wm.applyKillSessionAsync(wmTezSession, entry.getValue().getViolationMsg());
+ killFutures.put(wmTezSession, killFuture);
break;
case MOVE_TO_POOL:
- sessionState = entry.getKey();
- if (sessionState instanceof WmTezSession) {
- WmTezSession wmTezSession = (WmTezSession) sessionState;
- String destPoolName = entry.getValue().getAction().getPoolName();
- Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
- moveFutures.put(wmTezSession, moveFuture);
- } else {
- throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() +
- ". SessionId: " + sessionState.getSessionId());
- }
+ String destPoolName = entry.getValue().getAction().getPoolName();
+ Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+ moveFutures.put(wmTezSession, moveFuture);
break;
default:
throw new RuntimeException("Unsupported action: " + entry.getValue());
@@ -69,15 +57,28 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler {
for (Map.Entry<WmTezSession, Future<Boolean>> entry : moveFutures.entrySet()) {
WmTezSession wmTezSession = entry.getKey();
- Future<Boolean> moveFuture = entry.getValue();
+ Future<Boolean> future = entry.getValue();
try {
// block to make sure move happened successfully
- if (moveFuture.get()) {
+ if (future.get()) {
LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getPoolName());
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception while moving session {}", wmTezSession.getSessionId(), e);
}
}
+
+ for (Map.Entry<WmTezSession, Future<Boolean>> entry : killFutures.entrySet()) {
+ WmTezSession wmTezSession = entry.getKey();
+ Future<Boolean> future = entry.getValue();
+ try {
+ // block to make sure kill happened successfully
+ if (future.get()) {
+ LOG.info("Killed session {}", wmTezSession.getSessionId());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception while killing session {}", wmTezSession.getSessionId(), e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index 8c60b6f..50d234d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
/**
* Handles only Kill Action.
*/
-public class KillTriggerActionHandler implements TriggerActionHandler {
+public class KillTriggerActionHandler implements TriggerActionHandler<TezSessionState> {
private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class);
@Override
@@ -37,7 +37,7 @@ public class KillTriggerActionHandler implements TriggerActionHandler {
switch (entry.getValue().getAction().getType()) {
case KILL_QUERY:
TezSessionState sessionState = entry.getKey();
- String queryId = sessionState.getTriggerContext().getQueryId();
+ String queryId = sessionState.getWmContext().getQueryId();
try {
KillQuery killQuery = sessionState.getKillQuery();
// if kill query is null then session might have been released to pool or closed already
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 6fa3724..dd879fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -38,7 +38,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.login.LoginException;
-import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -62,7 +61,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.security.Credentials;
@@ -85,6 +84,9 @@ import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
@@ -95,6 +97,7 @@ import com.google.common.cache.CacheBuilder;
/**
* Holds session state related to Tez
*/
+@JsonSerialize
public class TezSessionState {
protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName());
@@ -111,19 +114,24 @@ public class TezSessionState {
private Future<TezClient> sessionFuture;
/** Console used for user feedback during async session opening. */
private LogHelper console;
+ @JsonProperty("sessionId")
private String sessionId;
private final DagUtils utils;
+ @JsonProperty("queueName")
private String queueName;
+ @JsonProperty("defaultQueue")
private boolean defaultQueue = false;
+ @JsonProperty("user")
private String user;
private AtomicReference<String> ownerThread = new AtomicReference<>(null);
private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
+ @JsonProperty("doAsEnabled")
private boolean doAsEnabled;
private boolean isLegacyLlapMode;
- private TriggerContext triggerContext;
+ private WmContext wmContext;
private KillQuery killQuery;
private static final Cache<String, String> shaCache = CacheBuilder.newBuilder().maximumSize(100).build();
@@ -852,12 +860,12 @@ public class TezSessionState {
TezSessionPoolManager.getInstance().destroy(this);
}
- public TriggerContext getTriggerContext() {
- return triggerContext;
+ public WmContext getWmContext() {
+ return wmContext;
}
- public void setTriggerContext(final TriggerContext triggerContext) {
- this.triggerContext = triggerContext;
+ public void setWmContext(final WmContext wmContext) {
+ this.wmContext = wmContext;
}
public void setKillQuery(final KillQuery killQuery) {
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index af77f30..8795cfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -63,7 +61,7 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -146,8 +144,8 @@ public class TezTask extends Task<TezWork> {
// some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext.
// Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId.
String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
- TriggerContext triggerContext = new TriggerContext(System.currentTimeMillis(), queryId);
- ctx.setTriggerContext(triggerContext);
+ WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
+ ctx.setWmContext(wmContext);
}
// Need to remove this static hack. But this is the way currently to get a session.
@@ -158,7 +156,6 @@ public class TezTask extends Task<TezWork> {
if (session != null && !session.isOpen()) {
LOG.warn("The session: " + session + " has not been opened");
}
- Set<String> desiredCounters = new HashSet<>();
// We only need a username for UGI to use for groups; getGroups will fetch the groups
// based on Hadoop configuration, as documented at
// https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
@@ -166,15 +163,11 @@ public class TezTask extends Task<TezWork> {
MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
: new MappingInput(ss.getUserName(),
UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
- session = WorkloadManagerFederation.getSession(
- session, conf, mi, getWork().getLlapMode(), desiredCounters);
+ WmContext wmContext = ctx.getWmContext();
+ session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext);
- TriggerContext triggerContext = ctx.getTriggerContext();
- triggerContext.setDesiredCounters(desiredCounters);
- LOG.info("Subscribed to counters: {} for queryId: {}",
- desiredCounters, triggerContext.getQueryId());
+ LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId());
ss.setTezSession(session);
- session.setTriggerContext(triggerContext);
try {
// jobConf will hold all the configuration for hadoop, tez, and hive
JobConf jobConf = utils.createConfiguration(conf);
@@ -256,12 +249,22 @@ public class TezTask extends Task<TezWork> {
// Currently, reopen on an attempted reuse will take care of that; we cannot tell
// if the session is usable until we try.
// We return this to the pool even if it's unusable; reopen is supposed to handle this.
+ wmContext = ctx.getWmContext();
try {
session.returnToSessionManager();
} catch (Exception e) {
LOG.error("Failed to return session: {} to pool", session, e);
throw e;
}
+
+ if (!conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("none") &&
+ wmContext != null) {
+ if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("json")) {
+ wmContext.printJson(console);
+ } else if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("text")) {
+ wmContext.print(console);
+ }
+ }
}
if (LOG.isInfoEnabled() && counters != null
@@ -585,9 +588,9 @@ public class TezTask extends Task<TezWork> {
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
// TODO: this is temporary, need to refactor how reopen is invoked.
- TriggerContext oldCtx = sessionState.getTriggerContext();
+ WmContext oldCtx = sessionState.getWmContext();
sessionState = sessionState.reopen(conf, inputOutputJars);
- sessionState.setTriggerContext(oldCtx);
+ sessionState.setWmContext(oldCtx);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (Exception retryException) {
// we failed to submit after retrying. Destroy session and bail.
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 5821659..6414f05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,17 +46,17 @@ public class TriggerValidatorRunnable implements Runnable {
final List<TezSessionState> sessions = sessionTriggerProvider.getSessions();
final List<Trigger> triggers = sessionTriggerProvider.getTriggers();
for (TezSessionState sessionState : sessions) {
- TriggerContext triggerContext = sessionState.getTriggerContext();
- if (triggerContext != null && !triggerContext.isQueryCompleted()
- && !triggerContext.getCurrentCounters().isEmpty()) {
- Map<String, Long> currentCounters = triggerContext.getCurrentCounters();
+ WmContext wmContext = sessionState.getWmContext();
+ if (wmContext != null && !wmContext.isQueryCompleted()
+ && !wmContext.getCurrentCounters().isEmpty()) {
+ Map<String, Long> currentCounters = wmContext.getCurrentCounters();
for (Trigger currentTrigger : triggers) {
String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName();
// there could be interval where desired counter value is not populated by the time we make this check
if (currentCounters.containsKey(desiredCounter)) {
long currentCounterValue = currentCounters.get(desiredCounter);
if (currentTrigger.apply(currentCounterValue)) {
- String queryId = sessionState.getTriggerContext().getQueryId();
+ String queryId = sessionState.getWmContext().getQueryId();
if (violatedSessions.containsKey(sessionState)) {
// session already has a violation
Trigger existingTrigger = violatedSessions.get(sessionState);
@@ -84,7 +84,7 @@ public class TriggerValidatorRunnable implements Runnable {
Trigger chosenTrigger = violatedSessions.get(sessionState);
if (chosenTrigger != null) {
- LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(),
+ LOG.info("Query: {}. {}. Applying action.", sessionState.getWmContext().getQueryId(),
chosenTrigger.getViolationMsg());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
new file mode 100644
index 0000000..33341ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Workload Manager events at query level.
+ */
+@JsonSerialize
+public class WmEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(WmEvent.class);
+ enum EventType {
+ GET, // get session
+ KILL, // kill query
+ DESTROY, // destroy session
+ RESTART, // restart session
+ RETURN, // return session back to pool
+ MOVE // move session to different pool
+ }
+
+ // snapshot of subset of wm tez session info for printing in events summary
+ @JsonSerialize
+ public static class WmTezSessionInfo {
+ @JsonProperty("sessionId")
+ private final String sessionId;
+ @JsonProperty("poolName")
+ private final String poolName;
+ @JsonProperty("clusterPercent")
+ private final double clusterPercent;
+
+ WmTezSessionInfo(WmTezSession wmTezSession) {
+ this.poolName = wmTezSession.getPoolName();
+ this.sessionId = wmTezSession.getSessionId();
+ this.clusterPercent = wmTezSession.getClusterFraction() * 100.0;
+ }
+
+ public String getPoolName() {
+ return poolName;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public double getClusterPercent() {
+ return clusterPercent;
+ }
+
+ @Override
+ public String toString() {
+ return "SessionId: " + sessionId + " Pool: " + poolName + " Cluster %: " + clusterPercent;
+ }
+ }
+
+ @JsonProperty("wmTezSessionInfo")
+ private WmTezSessionInfo wmTezSessionInfo;
+ @JsonProperty("eventStartTimestamp")
+ private long eventStartTimestamp;
+ @JsonProperty("eventEndTimestamp")
+ private long eventEndTimestamp;
+ @JsonProperty("eventType")
+ private final EventType eventType;
+ @JsonProperty("elapsedTime")
+ private long elapsedTime;
+
+ WmEvent(final EventType eventType) {
+ this.eventType = eventType;
+ this.eventStartTimestamp = System.currentTimeMillis();
+ }
+
+ public long getEventStartTimestamp() {
+ return eventStartTimestamp;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ public WmTezSessionInfo getWmTezSessionInfo() {
+ return wmTezSessionInfo;
+ }
+
+ public long getEventEndTimestamp() {
+ return eventEndTimestamp;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public void endEvent(final WmTezSession sessionState) {
+ this.wmTezSessionInfo = new WmTezSessionInfo(sessionState);
+ this.eventEndTimestamp = System.currentTimeMillis();
+ this.elapsedTime = eventEndTimestamp - eventStartTimestamp;
+ WmContext wmContext = sessionState.getWmContext();
+ if (wmContext != null) {
+ wmContext.addWMEvent(this);
+ LOG.info("Added WMEvent: {}", this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EventType: " + eventType + " EventStartTimestamp: " + eventStartTimestamp + " elapsedTime: " +
+ elapsedTime + " wmTezSessionInfo:" + wmTezSessionInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index d61c531..e78ef44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -21,28 +21,40 @@ package org.apache.hadoop.hive.ql.exec.tez;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hive.common.util.Ref;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+@JsonSerialize
public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
+ @JsonProperty("poolName")
private String poolName;
+ @JsonProperty("clusterFraction")
private double clusterFraction;
/**
* The reason to kill an AM. Note that this is for the entire session, not just for a query.
* Once set, this can never be unset because you can only kill the session once.
*/
+ @JsonProperty("killReason")
private String killReason = null;
private final Object amPluginInfoLock = new Object();
+ @JsonProperty("amPluginInfo")
private AmPluginInfo amPluginInfo = null;
- private Integer amPluginendpointVersion = null;
+ private Integer amPluginEndpointVersion = null;
private SettableFuture<WmTezSession> amRegistryFuture = null;
private ScheduledFuture<?> timeoutTimer = null;
+ @JsonProperty("queryId")
private String queryId;
+ private SettableFuture<Boolean> returnFuture = null;
private final WorkloadManager wmParent;
@@ -99,12 +111,12 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
synchronized (amPluginInfoLock) {
// Ignore the outdated updates; for the same version, ignore non-null updates because
// we assume that removal is the last thing that happens for any given version.
- if ((amPluginendpointVersion != null) && ((amPluginendpointVersion > ephSeqVersion)
- || (amPluginendpointVersion == ephSeqVersion && info != null))) {
+ if ((amPluginEndpointVersion != null) && ((amPluginEndpointVersion > ephSeqVersion)
+ || (amPluginEndpointVersion == ephSeqVersion && info != null))) {
LOG.info("Ignoring an outdated info update {}: {}", ephSeqVersion, si);
return;
}
- this.amPluginendpointVersion = ephSeqVersion;
+ this.amPluginEndpointVersion = ephSeqVersion;
this.amPluginInfo = info;
if (info != null) {
// Only update someone waiting for info if we have the info.
@@ -123,7 +135,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
@Override
public AmPluginInfo getAmPluginInfo(Ref<Integer> version) {
synchronized (amPluginInfoLock) {
- version.value = amPluginendpointVersion;
+ version.value = amPluginEndpointVersion;
return amPluginInfo;
}
}
@@ -132,7 +144,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
this.poolName = poolName;
}
- String getPoolName() {
+ public String getPoolName() {
return poolName;
}
@@ -145,7 +157,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
this.clusterFraction = 0f;
}
- double getClusterFraction() {
+ public double getClusterFraction() {
return this.clusterFraction;
}
@@ -235,6 +247,20 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
return this.queryId;
}
+ void createAndSetReturnFuture() {
+ this.returnFuture = SettableFuture.create();
+ if (getWmContext() != null) {
+ getWmContext().setReturnEventFuture(returnFuture);
+ }
+ }
+
+ void resolveReturnFuture() {
+ if (returnFuture != null) {
+ returnFuture.set(true);
+ returnFuture = null;
+ }
+ }
+
@Override
public String toString() {
return super.toString() + ", WM state poolName=" + poolName + ", clusterFraction="
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index ecdcf12..dbdbbf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,20 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.collect.Lists;
-
-import java.util.concurrent.ExecutionException;
-
-import java.util.Collection;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -42,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -50,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -68,11 +58,24 @@ import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hive.common.util.Ref;
import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/** Workload management entry point for HS2.
* Note on how this class operates.
@@ -92,6 +95,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private static final char POOL_SEPARATOR = '.';
private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
+ private final ObjectMapper objectMapper;
// Various final services, configs, etc.
private final HiveConf conf;
private final TezSessionPool<WmTezSession> tezAmPool;
@@ -112,6 +116,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private Map<String, PoolState> pools;
private String rpName, defaultPool; // For information only.
private int totalQueryParallelism;
+
/**
* The queries being killed. This is used to sync between the background kill finishing and the
* query finishing and user returning the sessions, which can happen in separate iterations
@@ -211,6 +216,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
wmThread.start();
updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied.
+
+ objectMapper = new ObjectMapper();
+ objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+ // serialize json based on field annotations only
+ objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
+ .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
+ .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
}
private static int determineQueryParallelism(WMFullResourcePlan plan) {
@@ -393,10 +405,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
KillQuery kq = toKill.getKillQuery();
try {
if (kq != null && queryId != null) {
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
try {
kq.killQuery(queryId, reason);
addKillQueryResult(toKill, true);
+ killCtx.killSessionFuture.set(true);
+ wmEvent.endEvent(toKill);
LOG.debug("Killed " + queryId);
return;
} catch (HiveException ex) {
@@ -423,8 +438,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
toRestart.setQueryId(null);
workPool.submit(() -> {
try {
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART);
// Note: sessions in toRestart are always in use, so they cannot expire in parallel.
tezAmPool.replaceSession(toRestart, false, null);
+ wmEvent.endEvent(toRestart);
} catch (Exception ex) {
LOG.error("Failed to restart an old session; ignoring", ex);
}
@@ -437,7 +454,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
LOG.info("Closing {} without restart", toDestroy);
workPool.submit(() -> {
try {
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.DESTROY);
toDestroy.close(false);
+ wmEvent.endEvent(toDestroy);
} catch (Exception ex) {
LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
}
@@ -513,9 +532,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
e, sessionToReturn, poolsToRedistribute, true);
switch (rr) {
case OK:
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
if (!wasReturned) {
syncWork.toDestroyNoRestart.add(sessionToReturn);
+ } else {
+ if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) {
+ sessionToReturn.resolveReturnFuture();
+ }
+ wmEvent.endEvent(sessionToReturn);
}
break;
case NOT_FOUND:
@@ -563,8 +588,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// We could consider delaying the move (when destination capacity is full) until there is claim in src pool.
// May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
// as possible
+ Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
for (MoveSession moveSession : e.moveSessions) {
- handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse);
+ handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, recordMoveEvents);
}
e.moveSessions.clear();
@@ -590,13 +616,21 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
case OK: {
iter.remove();
LOG.debug("Kill query succeeded; returning to the pool: {}", ctx.session);
+ ctx.killSessionFuture.set(true);
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
if (!tezAmPool.returnSessionAsync(ctx.session)) {
syncWork.toDestroyNoRestart.add(ctx.session);
+ } else {
+ if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) {
+ ctx.session.resolveReturnFuture();
+ }
+ wmEvent.endEvent(ctx.session);
}
break;
}
case RESTART_REQUIRED: {
iter.remove();
+ ctx.killSessionFuture.set(true);
LOG.debug("Kill query failed; restarting: {}", ctx.session);
// Note: we assume here the session, before we resolve killQuery result here, is still
// "in use". That is because all the user ops above like return, reopen, etc.
@@ -620,7 +654,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (LOG.isDebugEnabled()) {
LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
}
- processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
+ processPoolChangesOnMasterThread(poolName, hasRequeues);
}
@@ -631,7 +665,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- // 13. Notify tests and global async ops.
+ // 13. To record move events, we need to cluster fraction updates that happens at step 11.
+ for (Map.Entry<WmTezSession, WmEvent> entry : recordMoveEvents.entrySet()) {
+ entry.getValue().endEvent(entry.getKey());
+ }
+
+ // 14. Notify tests and global async ops.
if (e.dumpStateFuture != null) {
List<String> result = new ArrayList<>();
result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -676,11 +715,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork,
- Set<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse) {
+ private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
+ final WmThreadSyncWork syncWork,
+ final HashSet<String> poolsToRedistribute,
+ final Map<WmTezSession, GetRequest> toReuse,
+ final Map<WmTezSession, WmEvent> recordMoveEvents) {
String destPoolName = moveSession.destPool;
LOG.info("Handling move session event: {}", moveSession);
if (validMove(moveSession.srcSession, destPoolName)) {
+ WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
// remove from src pool
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
moveSession.srcSession, poolsToRedistribute, true);
@@ -692,15 +735,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
moveSession.srcSession, destPoolName, poolsToRedistribute);
if (added != null && added) {
moveSession.future.set(true);
+ recordMoveEvents.put(moveSession.srcSession, moveEvent);
return;
} else {
LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
}
} else {
WmTezSession session = moveSession.srcSession;
- resetRemovedSessionToKill(session, toReuse);
- syncWork.toKillQuery.put(session, new KillQueryContext(session, "Destination pool "
- + destPoolName + " is full. Killing query."));
+ KillQueryContext killQueryContext = new KillQueryContext(session, "Destination pool " + destPoolName +
+ " is full. Killing query.");
+ resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
}
} else {
LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -808,7 +852,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
case OK:
// If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
PoolState pool = pools.get(poolName);
- SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId());
+ SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext());
// We have just removed the session from the same pool, so don't check concurrency here.
pool.initializingSessions.add(sw);
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
@@ -918,6 +962,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
totalQueryParallelism += qp;
}
}
+ // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and
+ // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching
+ // triggers to specific pools.
+ // For usability,
+ // Provide a way for triggers sharing/inheritance possibly with following modes
+ // ONLY - only to pool
+ // INHERIT - child pools inherit from parent
+ // GLOBAL - all pools inherit
if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) {
Map<String, Trigger> triggers = new HashMap<>();
for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) {
@@ -1036,8 +1088,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
- private void processPoolChangesOnMasterThread(
- String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception {
+ private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues) throws Exception {
PoolState pool = pools.get(poolName);
if (pool == null) return; // Might be from before the new resource plan.
@@ -1058,7 +1109,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Note that in theory, we are guaranteed to have a session waiting for us here, but
// the expiration, failures, etc. may cause one to be missing pending restart.
// See SessionInitContext javadoc.
- SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId);
+ SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId,
+ queueReq.wmContext);
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
Futures.addCallback(getFuture, sw);
// It is possible that all the async methods returned on the same thread because the
@@ -1097,8 +1149,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
assert isOk || rr == RemoveSessionResult.IGNORE;
if (!isOk) return;
}
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
if (!tezAmPool.returnSessionAsync(session)) {
syncWork.toDestroyNoRestart.add(session);
+ } else {
+ if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) {
+ session.resolveReturnFuture();
+ }
+ wmEvent.endEvent(session);
}
}
@@ -1159,10 +1217,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
PoolState destPool = pools.get(destPoolName);
if (destPool != null && destPool.sessions.add(session)) {
session.setPoolName(destPoolName);
+ updateTriggers(session);
poolsToRedistribute.add(destPoolName);
return true;
}
- LOG.error("Session {} was not not added to pool {}", session, destPoolName);
+ LOG.error("Session {} was not added to pool {}", session, destPoolName);
return null;
}
@@ -1188,7 +1247,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return applyRpFuture;
}
- public Future<Boolean> applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) {
+ Future<Boolean> applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) {
currentLock.lock();
MoveSession moveSession;
try {
@@ -1202,28 +1261,42 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return moveSession.future;
}
+ Future<Boolean> applyKillSessionAsync(WmTezSession wmTezSession, String killReason) {
+ KillQueryContext killQueryContext;
+ currentLock.lock();
+ try {
+ killQueryContext = new KillQueryContext(wmTezSession, killReason);
+ resetAndQueueKill(syncWork.toKillQuery, killQueryContext, current.toReuse);
+ LOG.info("Queued session for kill: {}", killQueryContext.session);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ return killQueryContext.killSessionFuture;
+ }
+
private final static class GetRequest {
- public static final Comparator<GetRequest> ORDER_COMPARATOR = new Comparator<GetRequest>() {
- @Override
- public int compare(GetRequest o1, GetRequest o2) {
- if (o1.order == o2.order) return 0;
- return o1.order < o2.order ? -1 : 1;
- }
+ public static final Comparator<GetRequest> ORDER_COMPARATOR = (o1, o2) -> {
+ if (o1.order == o2.order) return 0;
+ return o1.order < o2.order ? -1 : 1;
};
private final long order;
private final MappingInput mappingInput;
private final SettableFuture<WmTezSession> future;
private WmTezSession sessionToReuse;
private final String queryId;
+ private final WmContext wmContext;
private GetRequest(MappingInput mappingInput, String queryId,
- SettableFuture<WmTezSession> future, WmTezSession sessionToReuse, long order) {
+ SettableFuture<WmTezSession> future, WmTezSession sessionToReuse, long order,
+ final WmContext wmContext) {
assert mappingInput != null;
this.mappingInput = mappingInput;
this.queryId = queryId;
this.future = future;
this.sessionToReuse = sessionToReuse;
this.order = order;
+ this.wmContext = wmContext;
}
@Override
@@ -1232,15 +1305,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- public TezSessionState getSession(
- TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
+ public WmTezSession getSession(
+ TezSessionState session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception {
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
// Note: not actually used for pool sessions; verify some things like doAs are not set.
validateConfig(conf);
String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
SettableFuture<WmTezSession> future = SettableFuture.create();
WmTezSession wmSession = checkSessionForReuse(session);
GetRequest req = new GetRequest(
- input, queryId, future, wmSession, getRequestVersion.incrementAndGet());
+ input, queryId, future, wmSession, getRequestVersion.incrementAndGet(), wmContext);
currentLock.lock();
try {
current.getRequests.add(req);
@@ -1252,7 +1326,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
} finally {
currentLock.unlock();
}
- return future.get();
+ WmTezSession sessionState = future.get();
+ wmEvent.endEvent(sessionState);
+ return sessionState;
}
@Override
@@ -1283,6 +1359,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
resetGlobalTezSession(wmTezSession);
currentLock.lock();
try {
+ wmTezSession.createAndSetReturnFuture();
current.toReturn.add(wmTezSession);
notifyWmThreadUnderLock();
} finally {
@@ -1388,7 +1465,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (sessionConf == null) {
LOG.warn("Session configuration is null for " + wmTezSession);
sessionConf = new HiveConf(conf, WorkloadManager.class);
-
}
// TODO: ideally, we should handle reopen the same way no matter what. However, the cases
// with additional files will have to wait until HIVE-17827 is unfucked, because it's
@@ -1519,23 +1595,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return conf;
}
- public List<String> getTriggerCounterNames(final TezSessionState session) {
- if (session instanceof WmTezSession) {
- WmTezSession wmTezSession = (WmTezSession) session;
- String poolName = wmTezSession.getPoolName();
- PoolState poolState = pools.get(poolName);
- if (poolState != null) {
- List<String> counterNames = new ArrayList<>();
- List<Trigger> triggers = poolState.getTriggers();
- if (triggers != null) {
- for (Trigger trigger : triggers) {
- counterNames.add(trigger.getExpression().getCounterLimit().getName());
- }
- }
- return counterNames;
- }
+ void updateTriggers(final WmTezSession session) {
+ WmContext wmContext = session.getWmContext();
+ String poolName = session.getPoolName();
+ PoolState poolState = pools.get(poolName);
+ if (wmContext != null && poolState != null) {
+ wmContext.addTriggers(poolState.getTriggers());
+ LOG.info("Subscribed to counters: {}", wmContext.getSubscribedCounters());
}
- return null;
}
@Override
@@ -1613,6 +1680,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return sessions;
}
+ public LinkedList<SessionInitContext> getInitializingSessions() {
+ return initializingSessions;
+ }
+
@Override
public String toString() {
return "[" + fullName + ", query parallelism " + queryParallelism
@@ -1625,8 +1696,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
IdentityHashMap<WmTezSession, GetRequest> toReuse,
Map<WmTezSession, KillQueryContext> toKill) {
for (WmTezSession sessionToKill : sessions) {
- resetRemovedSessionToKill(sessionToKill, toReuse);
- toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
+ resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
}
sessions.clear();
for (SessionInitContext initCtx : initializingSessions) {
@@ -1636,8 +1706,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (sessionToKill == null) {
continue; // Async op in progress; the callback will take care of this.
}
- resetRemovedSessionToKill(sessionToKill, toReuse);
- toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
+ resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
}
initializingSessions.clear();
}
@@ -1671,12 +1740,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private SettableFuture<WmTezSession> future;
private SessionInitState state;
private String cancelReason;
+ private WmContext wmContext;
- public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId) {
+ public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId,
+ final WmContext wmContext) {
this.state = SessionInitState.GETTING;
this.future = future;
this.poolName = poolName;
this.queryId = queryId;
+ this.wmContext = wmContext;
}
@Override
@@ -1693,6 +1765,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.setPoolName(poolName);
session.setQueueName(yarnQueue);
session.setQueryId(queryId);
+ session.setWmContext(wmContext);
this.session = session;
this.state = SessionInitState.WAITING_FOR_REGISTRY;
break;
@@ -1740,6 +1813,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.setPoolName(null);
session.setClusterFraction(0f);
session.setQueryId(null);
+ session.setWmContext(null);
tezAmPool.returnSession(session);
break;
}
@@ -1858,16 +1932,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* like the session even before we kill it, or the kill fails and the user is happily computing
* away. This class is to collect and make sense of the state around all this.
*/
- private static final class KillQueryContext {
+ static final class KillQueryContext {
+ private SettableFuture<Boolean> killSessionFuture;
private final String reason;
private final WmTezSession session;
// Note: all the fields are only modified by master thread.
private boolean isUserDone = false, isKillDone = false,
hasKillFailed = false, hasUserFailed = false;
- public KillQueryContext(WmTezSession session, String reason) {
+ KillQueryContext(WmTezSession session, String reason) {
this.session = session;
this.reason = reason;
+ this.killSessionFuture = SettableFuture.create();
}
private void handleKillQueryCallback(boolean hasFailed) {
@@ -1912,10 +1988,36 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- private static void resetRemovedSessionToKill(
- WmTezSession sessionToKill, Map<WmTezSession, GetRequest> toReuse) {
- sessionToKill.clearWm();
- GetRequest req = toReuse.remove(sessionToKill);
+ private static void resetRemovedSessionToKill(Map<WmTezSession, KillQueryContext> toKillQuery,
+ KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
+ toKillQuery.put(killQueryContext.session, killQueryContext);
+ killQueryContext.session.clearWm();
+ GetRequest req = toReuse.remove(killQueryContext.session);
+ if (req != null) {
+ req.sessionToReuse = null;
+ }
+ }
+
+ private void resetAndQueueKill(Map<WmTezSession, KillQueryContext> toKillQuery,
+ KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
+
+ WmTezSession toKill = killQueryContext.session;
+ toKillQuery.put(toKill, killQueryContext);
+
+ // The way this works is, a session in WM pool will move back to tez AM pool on a kill and will get
+ // reassigned back to WM pool on GetRequest based on user pool mapping. Only if we remove the session from active
+ // sessions list of its WM pool will the queue'd GetRequest be processed
+ String poolName = toKill.getPoolName();
+ if (poolName != null) {
+ PoolState poolState = pools.get(poolName);
+ if (poolState != null) {
+ poolState.getSessions().remove(toKill);
+ poolState.getInitializingSessions().remove(toKill);
+ }
+ }
+
+ toKill.clearWm();
+ GetRequest req = toReuse.remove(toKill);
if (req != null) {
req.sessionToReuse = null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index 0a9fa72..9d56204 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,51 +16,57 @@
* limitations under the License.
*/package org.apache.hadoop.hive.ql.exec.tez;
-import org.slf4j.LoggerFactory;
-
-import org.slf4j.Logger;
-
+import java.util.HashSet;
import java.util.Set;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WorkloadManagerFederation {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class);
public static TezSessionState getSession(TezSessionState session, HiveConf conf,
- MappingInput input, boolean isUnmanagedLlapMode, Set<String> desiredCounters) throws Exception {
+ MappingInput input, boolean isUnmanagedLlapMode, final WmContext wmContext) throws Exception {
+ Set<String> desiredCounters = new HashSet<>();
// 1. If WM is not present just go to unmanaged.
WorkloadManager wm = WorkloadManager.getInstance();
if (wm == null) {
LOG.debug("Using unmanaged session - WM is not initialized");
- return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+ return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
}
// 2. We will ask WM for a preliminary mapping. This allows us to escape to the unmanaged path
// quickly in the common case. It's still possible that resource plan will be updated and
// our preliminary mapping won't work out. We'll handle that below.
if (!wm.isManaged(input)) {
LOG.info("Using unmanaged session - no mapping for " + input);
- return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+ return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
}
// 3. Finally, try WM.
try {
// Note: this may just block to wait for a session based on parallelism.
LOG.info("Getting a WM session for " + input);
- TezSessionState result = wm.getSession(session, input, conf);
- desiredCounters.addAll(wm.getTriggerCounterNames(result));
+ WmTezSession result = wm.getSession(session, input, conf, wmContext);
+ result.setWmContext(wmContext);
+ wm.updateTriggers(result);
return result;
} catch (WorkloadManager.NoPoolMappingException ex) {
LOG.info("NoPoolMappingException thrown. Getting an un-managed session..");
- return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode);
+ return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode, wmContext);
}
}
private static TezSessionState getUnmanagedSession(
- TezSessionState session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode) throws Exception {
+ TezSessionState session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode,
+ final WmContext wmContext) throws Exception {
TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
session = pm.getSession(session, conf, false, isWorkLlapNode);
desiredCounters.addAll(pm.getTriggerCounterNames());
+ wmContext.setSubscribedCounters(desiredCounters);
+ session.setWmContext(wmContext);
return session;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
index 5bb6bf1..8414c73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -19,6 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez.monitoring;
import org.apache.hadoop.hive.ql.session.SessionState;
-interface PrintSummary {
+public interface PrintSummary {
void print(SessionState.LogHelper console);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 3dd4b31..9726af1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.wm.TimeCounterLimit;
-import org.apache.hadoop.hive.ql.wm.TriggerContext;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.ql.wm.VertexCounterLimit;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.tez.common.counters.CounterGroup;
@@ -156,7 +156,7 @@ public class TezJobMonitor {
boolean running = false;
long checkInterval = MIN_CHECK_INTERVAL;
- TriggerContext triggerContext = null;
+ WmContext wmContext = null;
while (true) {
try {
@@ -167,12 +167,12 @@ public class TezJobMonitor {
status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval);
TezCounters dagCounters = status.getDAGCounters();
vertexProgressMap = status.getVertexProgress();
- triggerContext = context.getTriggerContext();
- if (dagCounters != null && triggerContext != null) {
- Set<String> desiredCounters = triggerContext.getDesiredCounters();
+ wmContext = context.getWmContext();
+ if (dagCounters != null && wmContext != null) {
+ Set<String> desiredCounters = wmContext.getSubscribedCounters();
if (desiredCounters != null && !desiredCounters.isEmpty()) {
Map<String, Long> currentCounters = getCounterValues(dagCounters, vertexProgressMap, desiredCounters, done);
- triggerContext.setCurrentCounters(currentCounters);
+ wmContext.setCurrentCounters(currentCounters);
}
}
DAGStatus.State state = status.getState();
@@ -234,8 +234,8 @@ public class TezJobMonitor {
break;
}
}
- if (triggerContext != null && done) {
- triggerContext.setQueryCompleted(true);
+ if (wmContext != null && done) {
+ wmContext.setQueryCompleted(true);
}
} catch (Exception e) {
console.printInfo("Exception: " + e.getMessage());
@@ -263,13 +263,13 @@ public class TezJobMonitor {
} else {
console.printInfo("Retrying...");
}
- if (triggerContext != null && done) {
- triggerContext.setQueryCompleted(true);
+ if (wmContext != null && done) {
+ wmContext.setQueryCompleted(true);
}
} finally {
if (done) {
- if (triggerContext != null && done) {
- triggerContext.setQueryCompleted(true);
+ if (wmContext != null && done) {
+ wmContext.setQueryCompleted(true);
}
if (rc != 0 && status != null) {
for (String diag : status.getDiagnostics()) {
@@ -324,7 +324,7 @@ public class TezJobMonitor {
if (!done) {
counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name();
if (desiredCounters.contains(counterName)) {
- updatedCounters.put(counterName, context.getTriggerContext().getElapsedTime());
+ updatedCounters.put(counterName, context.getWmContext().getElapsedTime());
}
counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name();
@@ -351,6 +351,7 @@ public class TezJobMonitor {
new LLAPioSummary(progressMap, dagClient).print(console);
new FSCountersSummary(progressMap, dagClient).print(console);
}
+
console.printInfo("");
}
}