You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/12/08 18:20:05 UTC
hive git commit: HIVE-15149: Add additional information to ATSHook
for Tez UI (Jason Dere, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master b4980afd3 -> 84b7fc5bd
HIVE-15149: Add additional information to ATSHook for Tez UI (Jason Dere, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/84b7fc5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/84b7fc5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/84b7fc5b
Branch: refs/heads/master
Commit: 84b7fc5bdc011542fe7f7a8c709fdff710e00d9a
Parents: b4980af
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Dec 8 10:19:27 2016 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Dec 8 10:19:27 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 4 +-
.../apache/hadoop/hive/ql/hooks/ATSHook.java | 118 +++++++++++++++++--
.../hadoop/hive/ql/hooks/HookContext.java | 38 +++++-
3 files changed, 147 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 757c60c..79e95cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -1731,7 +1732,8 @@ public class Driver implements CommandProcessor {
SessionState ss = SessionState.get();
hookContext = new HookContext(plan, queryState, ctx.getPathToCS(), ss.getUserName(),
- ss.getUserIpAddress(), operationId, ss.getSessionId());
+ ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(),
+ Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 8ee5c04..0b3c419 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -17,7 +17,14 @@
*/
package org.apache.hadoop.hive.ql.hooks;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -29,9 +36,10 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -59,11 +67,23 @@ public class ATSHook implements ExecuteWithHookContext {
private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED };
private enum OtherInfoTypes {
- QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION
+ QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION,
+ CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF,
};
- private enum PrimaryFilterTypes { user, requestuser, operationid };
+ private enum ExecutionMode {
+ MR, TEZ, LLAP, SPARK, NONE
+ };
+ private enum PrimaryFilterTypes {
+ user, requestuser, operationid, executionmode, tablesread, tableswritten, queue
+ };
+
private static final int WAIT_TIME = 3;
+ private static final String[] PERF_KEYS = new String[] {
+ PerfLogger.PARSE, PerfLogger.COMPILE, PerfLogger.ANALYZE, PerfLogger.OPTIMIZER,
+ PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS,
+ };
+
public ATSHook() {
synchronized(LOCK) {
if (executor == null) {
@@ -139,14 +159,26 @@ public class ATSHook implements ExecuteWithHookContext {
String query = plan.getQueryStr();
JSONObject explainPlan = explain.getJSONPlan(null, work);
String logID = conf.getLogIdVar(hookContext.getSessionId());
- fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime,
- user, requestuser, numMrJobs, numTezJobs, opId, logID));
+ List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
+ List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
+ String executionMode = getExecutionMode(plan).name();
+ String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
+ if (hiveInstanceAddress == null) {
+ hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+ }
+ String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
+ fireAndForget(conf,
+ createPreHookEvent(queryId, query, explainPlan, queryStartTime,
+ user, requestuser, numMrJobs, numTezJobs, opId,
+ hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
+ logID, hookContext.getThreadId(), executionMode,
+ tablesRead, tablesWritten, conf));
break;
case POST_EXEC_HOOK:
- fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId));
+ fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
break;
case ON_FAILURE_HOOK:
- fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId));
+ fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
break;
default:
//ignore
@@ -159,9 +191,45 @@ public class ATSHook implements ExecuteWithHookContext {
});
}
+ protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
+ List<String> tableNames = new ArrayList<String>();
+ for (Entity entity : entities) {
+ if (entity.getType() == Entity.Type.TABLE) {
+ tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName());
+ }
+ }
+ return tableNames;
+ }
+
+ protected ExecutionMode getExecutionMode(QueryPlan plan) {
+ int numMRJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+ int numSparkJobs = Utilities.getSparkTasks(plan.getRootTasks()).size();
+ int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
+
+ ExecutionMode mode = ExecutionMode.MR;
+ if (0 == (numMRJobs + numSparkJobs + numTezJobs)) {
+ mode = ExecutionMode.NONE;
+ } else if (numSparkJobs > 0) {
+ return ExecutionMode.SPARK;
+ } else if (numTezJobs > 0) {
+ mode = ExecutionMode.TEZ;
+ // Need to go in and check if any of the tasks is running in LLAP mode.
+ for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) {
+ if (tezTask.getWork().getLlapMode()) {
+ mode = ExecutionMode.LLAP;
+ break;
+ }
+ }
+ }
+
+ return mode;
+ }
+
TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan,
long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId,
- String logID) throws Exception {
+ String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType,
+ String logID, String threadId, String executionMode,
+ List<String> tablesRead, List<String> tablesWritten, HiveConf conf) throws Exception {
JSONObject queryObj = new JSONObject(new LinkedHashMap<>());
queryObj.put("queryText", query);
@@ -173,16 +241,32 @@ public class ATSHook implements ExecuteWithHookContext {
LOG.debug("Operation id: <" + opId + ">");
}
+ conf.stripHiddenConfigurations(conf);
+ Map<String, String> confMap = new HashMap<String, String>();
+ for (Map.Entry<String, String> setting : conf) {
+ confMap.put(setting.getKey(), setting.getValue());
+ }
+ JSONObject confObj = new JSONObject((Map) confMap);
+
TimelineEntity atsEntity = new TimelineEntity();
atsEntity.setEntityId(queryId);
atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user);
atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser);
+ atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode);
+ atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename"));
if (opId != null) {
atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId);
}
+ for (String tabName : tablesRead) {
+ atsEntity.addPrimaryFilter(PrimaryFilterTypes.tablesread.name(), tabName);
+ }
+ for (String tabName : tablesWritten) {
+ atsEntity.addPrimaryFilter(PrimaryFilterTypes.tableswritten.name(), tabName);
+ }
+
TimelineEvent startEvt = new TimelineEvent();
startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name());
startEvt.setTimestamp(startTime);
@@ -192,13 +276,20 @@ public class ATSHook implements ExecuteWithHookContext {
atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0);
atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0);
atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID);
- atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName());
+ atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId);
atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION);
+ if (clientIpAddress != null) {
+ atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress);
+ }
+ atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress);
+ atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType);
+ atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString());
+
return atsEntity;
}
TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success,
- String opId) {
+ String opId, PerfLogger perfLogger) throws Exception {
LOG.info("Received post-hook notification for :" + queryId);
TimelineEntity atsEntity = new TimelineEntity();
@@ -217,6 +308,13 @@ public class ATSHook implements ExecuteWithHookContext {
atsEntity.addOtherInfo(OtherInfoTypes.STATUS.name(), success);
+ // Perf times
+ JSONObject perfObj = new JSONObject(new LinkedHashMap<>());
+ for (String key : perfLogger.getEndTimes().keySet()) {
+ perfObj.put(key, perfLogger.getDuration(key));
+ }
+ atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), perfObj.toString());
+
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 3b4cc2c..c94100c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
@@ -57,15 +58,19 @@ public class HookContext {
private Throwable exception;
final private Map<String, ContentSummary> inputPathToContentSummary;
private final String ipAddress;
+ private final String hiveInstanceAddress;
private final String userName;
// unique id set for operation when run from HS2, base64 encoded value of
// TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid
private final String operationId;
private final String sessionId;
+ private final String threadId;
+ private boolean isHiveServerQuery;
+ private PerfLogger perfLogger;
public HookContext(QueryPlan queryPlan, QueryState queryState,
- Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress,
- String operationId, String sessionId) throws Exception {
+ Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress, String hiveInstanceAddress,
+ String operationId, String sessionId, String threadId, boolean isHiveServerQuery, PerfLogger perfLogger) throws Exception {
this.queryPlan = queryPlan;
this.queryState = queryState;
this.conf = queryState.getConf();
@@ -82,8 +87,12 @@ public class HookContext {
}
this.userName = userName;
this.ipAddress = ipAddress;
+ this.hiveInstanceAddress = hiveInstanceAddress;
this.operationId = operationId;
this.sessionId = sessionId;
+ this.threadId = threadId;
+ this.isHiveServerQuery = isHiveServerQuery;
+ this.perfLogger = perfLogger;
}
public QueryPlan getQueryPlan() {
@@ -170,6 +179,10 @@ public class HookContext {
return this.ipAddress;
}
+ public String getHiveInstanceAddress() {
+ return hiveInstanceAddress;
+ }
+
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
@@ -205,4 +218,25 @@ public class HookContext {
public String getSessionId() {
return sessionId;
}
+
+ public String getThreadId() {
+ return threadId;
+ }
+
+ public boolean isHiveServerQuery() {
+ return isHiveServerQuery;
+ }
+
+ public void setHiveServerQuery(boolean isHiveServerQuery) {
+ this.isHiveServerQuery = isHiveServerQuery;
+ }
+
+ public PerfLogger getPerfLogger() {
+ return perfLogger;
+ }
+
+ public void setPerfLogger(PerfLogger perfLogger) {
+ this.perfLogger = perfLogger;
+ }
+
}