You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/12/08 18:20:05 UTC

hive git commit: HIVE-15149: Add additional information to ATSHook for Tez UI (Jason Dere, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master b4980afd3 -> 84b7fc5bd


HIVE-15149: Add additional information to ATSHook for Tez UI (Jason Dere, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/84b7fc5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/84b7fc5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/84b7fc5b

Branch: refs/heads/master
Commit: 84b7fc5bdc011542fe7f7a8c709fdff710e00d9a
Parents: b4980af
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Dec 8 10:19:27 2016 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Dec 8 10:19:27 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   4 +-
 .../apache/hadoop/hive/ql/hooks/ATSHook.java    | 118 +++++++++++++++++--
 .../hadoop/hive/ql/hooks/HookContext.java       |  38 +++++-
 3 files changed, 147 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 757c60c..79e95cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.Serializable;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1731,7 +1732,8 @@ public class Driver implements CommandProcessor {
 
       SessionState ss = SessionState.get();
       hookContext = new HookContext(plan, queryState, ctx.getPathToCS(), ss.getUserName(),
-          ss.getUserIpAddress(), operationId, ss.getSessionId());
+          ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(),
+          Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger);
       hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
 
       for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 8ee5c04..0b3c419 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -17,7 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.hooks;
 
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -29,9 +36,10 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -59,11 +67,23 @@ public class ATSHook implements ExecuteWithHookContext {
   private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED };
 
   private enum OtherInfoTypes {
-    QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION
+    QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION,
+    CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF,
   };
-  private enum PrimaryFilterTypes { user, requestuser, operationid };
+  private enum ExecutionMode {
+    MR, TEZ, LLAP, SPARK, NONE
+  };
+  private enum PrimaryFilterTypes {
+    user, requestuser, operationid, executionmode, tablesread, tableswritten, queue
+  };
+
   private static final int WAIT_TIME = 3;
 
+  private static final String[] PERF_KEYS = new String[] {
+    PerfLogger.PARSE, PerfLogger.COMPILE, PerfLogger.ANALYZE, PerfLogger.OPTIMIZER,
+    PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS,
+  };
+
   public ATSHook() {
     synchronized(LOCK) {
       if (executor == null) {
@@ -139,14 +159,26 @@ public class ATSHook implements ExecuteWithHookContext {
               String query = plan.getQueryStr();
               JSONObject explainPlan = explain.getJSONPlan(null, work);
               String logID = conf.getLogIdVar(hookContext.getSessionId());
-              fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime,
-                user, requestuser, numMrJobs, numTezJobs, opId, logID));
+              List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs());
+              List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs());
+              String executionMode = getExecutionMode(plan).name();
+              String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
+              if (hiveInstanceAddress == null) {
+                hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
+              }
+              String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
+              fireAndForget(conf,
+                  createPreHookEvent(queryId, query, explainPlan, queryStartTime,
+                      user, requestuser, numMrJobs, numTezJobs, opId,
+                      hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType,
+                      logID, hookContext.getThreadId(), executionMode,
+                      tablesRead, tablesWritten, conf));
               break;
             case POST_EXEC_HOOK:
-              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId));
+              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger()));
               break;
             case ON_FAILURE_HOOK:
-              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId));
+              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger()));
               break;
             default:
               //ignore
@@ -159,9 +191,45 @@ public class ATSHook implements ExecuteWithHookContext {
       });
   }
 
+  protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
+    List<String> tableNames = new ArrayList<String>();
+    for (Entity entity : entities) {
+      if (entity.getType() == Entity.Type.TABLE) {
+        tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName());
+      }
+    }
+    return tableNames;
+  }
+
+  protected ExecutionMode getExecutionMode(QueryPlan plan) {
+    int numMRJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+    int numSparkJobs = Utilities.getSparkTasks(plan.getRootTasks()).size();
+    int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
+
+    ExecutionMode mode = ExecutionMode.MR;
+    if (0 == (numMRJobs + numSparkJobs + numTezJobs)) {
+      mode = ExecutionMode.NONE;
+    } else if (numSparkJobs > 0) {
+      return ExecutionMode.SPARK;
+    } else if (numTezJobs > 0) {
+      mode = ExecutionMode.TEZ;
+      // Need to go in and check if any of the tasks is running in LLAP mode.
+      for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) {
+        if (tezTask.getWork().getLlapMode()) {
+          mode = ExecutionMode.LLAP;
+          break;
+        }
+      }
+    }
+
+    return mode;
+  }
+
   TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan,
       long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId,
-      String logID) throws Exception {
+      String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType,
+      String logID, String threadId, String executionMode,
+      List<String> tablesRead, List<String> tablesWritten, HiveConf conf) throws Exception {
 
     JSONObject queryObj = new JSONObject(new LinkedHashMap<>());
     queryObj.put("queryText", query);
@@ -173,16 +241,32 @@ public class ATSHook implements ExecuteWithHookContext {
       LOG.debug("Operation id: <" + opId + ">");
     }
 
+    conf.stripHiddenConfigurations(conf);
+    Map<String, String> confMap = new HashMap<String, String>();
+    for (Map.Entry<String, String> setting : conf) {
+      confMap.put(setting.getKey(), setting.getValue());
+    }
+    JSONObject confObj = new JSONObject((Map) confMap);
+
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId(queryId);
     atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
     atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user);
     atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser);
+    atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode);
+    atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename"));
 
     if (opId != null) {
       atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId);
     }
 
+    for (String tabName : tablesRead) {
+      atsEntity.addPrimaryFilter(PrimaryFilterTypes.tablesread.name(), tabName);
+    }
+    for (String tabName : tablesWritten) {
+      atsEntity.addPrimaryFilter(PrimaryFilterTypes.tableswritten.name(), tabName);
+    }
+
     TimelineEvent startEvt = new TimelineEvent();
     startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name());
     startEvt.setTimestamp(startTime);
@@ -192,13 +276,20 @@ public class ATSHook implements ExecuteWithHookContext {
     atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0);
     atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0);
     atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID);
-    atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName());
+    atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId);
     atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION);
+    if (clientIpAddress != null) {
+      atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress);
+    }
+    atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress);
+    atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType);
+    atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString());
+
     return atsEntity;
   }
 
   TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success,
-      String opId) {
+      String opId, PerfLogger perfLogger) throws Exception {
     LOG.info("Received post-hook notification for :" + queryId);
 
     TimelineEntity atsEntity = new TimelineEntity();
@@ -217,6 +308,13 @@ public class ATSHook implements ExecuteWithHookContext {
 
     atsEntity.addOtherInfo(OtherInfoTypes.STATUS.name(), success);
 
+    // Perf times
+    JSONObject perfObj = new JSONObject(new LinkedHashMap<>());
+    for (String key : perfLogger.getEndTimes().keySet()) {
+      perfObj.put(key, perfLogger.getDuration(key));
+    }
+    atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), perfObj.toString());
+
     return atsEntity;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/84b7fc5b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 3b4cc2c..c94100c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
@@ -57,15 +58,19 @@ public class HookContext {
   private Throwable exception;
   final private Map<String, ContentSummary> inputPathToContentSummary;
   private final String ipAddress;
+  private final String hiveInstanceAddress;
   private final String userName;
   // unique id set for operation when run from HS2, base64 encoded value of
   // TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid
   private final String operationId;
   private final String sessionId;
+  private final String threadId;
+  private boolean isHiveServerQuery;
+  private PerfLogger perfLogger;
 
   public HookContext(QueryPlan queryPlan, QueryState queryState,
-      Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress,
-      String operationId, String sessionId) throws Exception {
+      Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress, String hiveInstanceAddress,
+      String operationId, String sessionId, String threadId, boolean isHiveServerQuery, PerfLogger perfLogger) throws Exception {
     this.queryPlan = queryPlan;
     this.queryState = queryState;
     this.conf = queryState.getConf();
@@ -82,8 +87,12 @@ public class HookContext {
     }
     this.userName = userName;
     this.ipAddress = ipAddress;
+    this.hiveInstanceAddress = hiveInstanceAddress;
     this.operationId = operationId;
     this.sessionId = sessionId;
+    this.threadId = threadId;
+    this.isHiveServerQuery = isHiveServerQuery;
+    this.perfLogger = perfLogger;
   }
 
   public QueryPlan getQueryPlan() {
@@ -170,6 +179,10 @@ public class HookContext {
     return this.ipAddress;
   }
 
+  public String getHiveInstanceAddress() {
+    return hiveInstanceAddress;
+  }
+
   public void setErrorMessage(String errorMessage) {
     this.errorMessage = errorMessage;
   }
@@ -205,4 +218,25 @@ public class HookContext {
   public String getSessionId() {
     return sessionId;
   }
+
+  public String getThreadId() {
+    return threadId;
+  }
+
+  public boolean isHiveServerQuery() {
+    return isHiveServerQuery;
+  }
+
+  public void setHiveServerQuery(boolean isHiveServerQuery) {
+    this.isHiveServerQuery = isHiveServerQuery;
+  }
+
+  public PerfLogger getPerfLogger() {
+    return perfLogger;
+  }
+
+  public void setPerfLogger(PerfLogger perfLogger) {
+    this.perfLogger = perfLogger;
+  }
+
 }