You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/05/20 04:10:06 UTC

[2/3] hive git commit: HIVE-12387: Issues in Hive's use of CallerContext (Vikram Dixit K, reviewed by Gunther Hagleitner)

HIVE-12387: Issues in Hive's use of CallerContext (Vikram Dixit K, reviewed by Gunther Hagleitner)

Conflicts:
	cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
	ql/src/java/org/apache/hadoop/hive/ql/Driver.java
	ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
	ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
	ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
	ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
	service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
	shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4b676cd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4b676cd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4b676cd

Branch: refs/heads/branch-2.2
Commit: f4b676cd62b304b1926615ec3b53bd65dac7c3ad
Parents: 7ea8e1c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Dec 16 10:57:35 2016 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri May 19 16:22:52 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 39 ++++++++++++++++----
 .../org/apache/hadoop/hive/ql/QueryPlan.java    | 35 ++++++++++++++++--
 .../apache/hadoop/hive/ql/hooks/ATSHook.java    | 25 ++++++++-----
 .../parse/TestUpdateDeleteSemanticAnalyzer.java |  2 +-
 .../service/cli/session/HiveSessionImpl.java    | 10 ++++-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java | 16 +++++++-
 .../apache/hadoop/hive/shims/HadoopShims.java   | 10 +++++
 7 files changed, 114 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 08bd040..c8e2c4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -112,6 +112,8 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
@@ -396,12 +398,13 @@ public class Driver implements CommandProcessor {
     }
 
     if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
-      close();
+      closeInProcess(false);
     }
+
     if (isInterrupted()) {
       return handleInterruption("at beginning of compilation."); //indicate if need clean resource
     }
-    
+
     if (resetTaskIds) {
       TaskFactory.resetId();
     }
@@ -416,6 +419,8 @@ public class Driver implements CommandProcessor {
 
     SessionState.get().setupQueryCurrentTimestamp();
 
+    String originalCallerContext = "";
+    HadoopShims shim = ShimLoader.getHadoopShims();
     boolean compileError = false;
     try {
       // Initialize the transaction manager.  This must be done before analyze is called.
@@ -437,6 +442,13 @@ public class Driver implements CommandProcessor {
       };
       ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
 
+      // we set the hadoop caller context to the query id as soon as we have one.
+      // initially, the caller context is the session id (when creating temp directories)
+      originalCallerContext = shim.getHadoopCallerContext();
+      LOG.info("We are setting the hadoop caller context from " + originalCallerContext + " to "
+          + queryId);
+      shim.setHadoopQueryContext(queryId);
+
       if (isInterrupted()) {
         return handleInterruption("before parsing and analysing the query");
       }
@@ -444,7 +456,7 @@ public class Driver implements CommandProcessor {
       if (ctx == null) {
         ctx = new Context(conf);
       }
-      
+
       ctx.setTryCount(getTryCount());
       ctx.setCmd(command);
       ctx.setHDFSCleanup(true);
@@ -502,7 +514,9 @@ public class Driver implements CommandProcessor {
       // get the output schema
       schema = getSchema(sem, conf);
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
-        queryState.getHiveOperation(), schema);
+        queryState.getHiveOperation(), schema,
+        SessionState.get().getSessionId(), Thread.currentThread().getName(),
+        HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_LOG_TRACE_ID));
 
       conf.setQueryString(queryStr);
 
@@ -598,6 +612,10 @@ public class Driver implements CommandProcessor {
       } else {
         LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
       }
+
+      // reset the caller id.
+      LOG.info("We are resetting the hadoop caller context to " + originalCallerContext);
+      shim.setHadoopCallerContext(originalCallerContext);
     }
   }
 
@@ -687,7 +705,7 @@ public class Driver implements CommandProcessor {
     }
 
     // The following union operation returns a union, which traverses over the
-    // first set once and then  then over each element of second set, in order, 
+    // first set once and then  then over each element of second set, in order,
     // that is not contained in first. This means it doesn't replace anything
     // in first set, and would preserve the WriteType in WriteEntity in first
     // set in case of outputs list.
@@ -1009,7 +1027,7 @@ public class Driver implements CommandProcessor {
     conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
     if(plan.getFetchTask() != null) {
       /**
-       * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which 
+       * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which
        * initializes JobConf in FetchOperator before recordValidTxns() but this has to be done
        * after locks are acquired to avoid race conditions in ACID.
        */
@@ -1663,8 +1681,13 @@ public class Driver implements CommandProcessor {
     maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
 
     HookContext hookContext = null;
+
+    String originalCallerContext = "";
     boolean executionError = false;
     try {
+      LOG.info("Setting caller context to query id " + queryId);
+      originalCallerContext = ShimLoader.getHadoopShims().getHadoopCallerContext();
+      ShimLoader.getHadoopShims().setHadoopQueryContext(queryId);
       LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
       // compile and execute can get called from different threads in case of HS2
       // so clear timing in this thread's Hive object before proceeding.
@@ -1908,6 +1931,8 @@ public class Driver implements CommandProcessor {
           + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return (12);
     } finally {
+      LOG.info("Resetting the caller context to " + originalCallerContext);
+      ShimLoader.getHadoopShims().setHadoopCallerContext(originalCallerContext);
       if (SessionState.get() != null) {
         SessionState.get().getHiveHistory().endQuery(queryId);
       }
@@ -2348,7 +2373,7 @@ public class Driver implements CommandProcessor {
     this.operationId = opId;
   }
 
-  /** 
+  /**
    * Resets QueryState to get new queryId on Driver reuse.
    */
   public void resetQueryState() {

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..d05e53b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -106,6 +106,9 @@ public class QueryPlan implements Serializable {
   private transient Long queryStartTime;
   private final HiveOperation operation;
   private Boolean autoCommitValue;
+  private String sessionId;
+  private String threadName;
+  private String userProvidedContext;
 
   public QueryPlan() {
     this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
@@ -113,7 +116,8 @@ public class QueryPlan implements Serializable {
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-                  HiveOperation operation, Schema resultSchema) {
+                   HiveOperation operation, Schema resultSchema,
+                   String sessionId, String threadName, String userProvidedContext) {
     this.queryString = queryString;
 
     rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
@@ -136,6 +140,9 @@ public class QueryPlan implements Serializable {
     this.operation = operation;
     this.autoCommitValue = sem.getAutoCommitValue();
     this.resultSchema = resultSchema;
+    this.setSessionId(sessionId);
+    this.setThreadName(threadName);
+    this.setUserProvidedContext(userProvidedContext);
   }
 
   public String getQueryStr() {
@@ -609,7 +616,6 @@ public class QueryPlan implements Serializable {
     try {
       q.write(oprot);
     } catch (TException e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
       return q.toString();
     }
@@ -623,7 +629,6 @@ public class QueryPlan implements Serializable {
     try {
       q.write(oprot);
     } catch (TException e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
       return q.toString();
     }
@@ -802,4 +807,28 @@ public class QueryPlan implements Serializable {
   public Boolean getAutoCommitValue() {
     return autoCommitValue;
   }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void setSessionId(String sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  public String getThreadName() {
+    return threadName;
+  }
+
+  public void setThreadName(String threadName) {
+    this.threadName = threadName;
+  }
+
+  public String getUserProvidedContext() {
+    return userProvidedContext;
+  }
+
+  public void setUserProvidedContext(String userProvidedContext) {
+    this.userProvidedContext = userProvidedContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 218e0b4..f968da6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -79,8 +79,8 @@ public class ATSHook implements ExecuteWithHookContext {
   private static final String DEFAULT_ATS_DOMAIN = "hive_default_ats";
 
   private enum OtherInfoTypes {
-    QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION,
-    CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF,
+    QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, LOG_TRACE_ID, THREAD_NAME, VERSION,
+    CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF, LLAP_APP_ID
   };
   private enum ExecutionMode {
     MR, TEZ, LLAP, SPARK, NONE
@@ -275,8 +275,7 @@ public class ATSHook implements ExecuteWithHookContext {
                                                   String query = plan.getQueryStr();
                                                   JSONObject explainPlan = explain.getJSONPlan(null,
                                                                   work);
-						  String logID = conf.getLogIdVar(hookContext.getSessionId());
-						  List<String> tablesRead = getTablesFromEntitySet(hookContext
+                                                  List<String> tablesRead = getTablesFromEntitySet(hookContext
                                                                   .getInputs());
                                                   List<String> tablesWritten = getTablesFromEntitySet(hookContext
                                                                   .getOutputs());
@@ -297,9 +296,9 @@ public class ATSHook implements ExecuteWithHookContext {
                                                                   hookContext.getIpAddress(),
                                                                   hiveInstanceAddress, hiveInstanceType,
                                                                   hookContext.getSessionId(),
-                                                                  logID,
+                                                                  plan.getUserProvidedContext(),
                                                                   hookContext.getThreadId(), executionMode,
-                                                                  tablesRead, tablesWritten, conf));
+                                                                  tablesRead, tablesWritten, conf, llapId, domainId));
                                                   break;
                                           case POST_EXEC_HOOK:
                                                   fireAndForget(createPostHookEvent(queryId,
@@ -361,8 +360,10 @@ public class ATSHook implements ExecuteWithHookContext {
   TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan,
       long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId,
       String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType,
-      String sessionID, String logID, String threadId, String executionMode,
-      List<String> tablesRead, List<String> tablesWritten, HiveConf conf) throws Exception {
+      String sessionID, String logTraceId, String threadId, String executionMode,
+      List<String> tablesRead, List<String> tablesWritten, HiveConf conf, ApplicationId llapAppId,
+      String domainId)
+          throws Exception {
 
     JSONObject queryObj = new JSONObject(new LinkedHashMap<>());
     queryObj.put("queryText", query);
@@ -409,8 +410,10 @@ public class ATSHook implements ExecuteWithHookContext {
     atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0);
     atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0);
     atsEntity.addOtherInfo(OtherInfoTypes.SESSION_ID.name(), sessionID);
-    atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID);
     atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId);
+    if ((logTraceId != null) && (logTraceId.equals("") == false)) {
+      atsEntity.addOtherInfo(OtherInfoTypes.LOG_TRACE_ID.name(), logTraceId);
+    }
     atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION);
     if (clientIpAddress != null) {
       atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress);
@@ -418,6 +421,10 @@ public class ATSHook implements ExecuteWithHookContext {
     atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress);
     atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType);
     atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString());
+    if (llapAppId != null) {
+      atsEntity.addOtherInfo(OtherInfoTypes.LLAP_APP_ID.name(), llapAppId.toString());
+    }
+    atsEntity.setDomainId(domainId);
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
index a573808..9fa3b25 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
@@ -283,7 +283,7 @@ public class TestUpdateDeleteSemanticAnalyzer {
     // validate the plan
     sem.validate();
 
-    QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null);
+    QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null, "", "", "");
 
     return new ReturnInfo(sem, plan);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 3f9dfdc..4d5cdac 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -355,7 +355,7 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   private synchronized void acquireAfterOpLock(boolean userAccess) {
-    // Need to make sure that the this HiveServer2's session's SessionState is
+    // Need to make sure that this HiveServer2's session's session state is
     // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
     sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses());
@@ -365,6 +365,10 @@ public class HiveSessionImpl implements HiveSession {
     }
     // set the thread name with the logging prefix.
     sessionState.updateThreadName();
+    // set the log context for debugging
+    LOG.info("We are setting the hadoop caller context to " + sessionState.getSessionId()
+        + " for thread " + Thread.currentThread().getName());
+    ShimLoader.getHadoopShims().setHadoopCallerContext(sessionState.getSessionId());
     Hive.set(sessionHive);
   }
 
@@ -390,6 +394,10 @@ public class HiveSessionImpl implements HiveSession {
       // can be null in-case of junit tests. skip reset.
       // reset thread name at release time.
       sessionState.resetThreadName();
+      // reset the HDFS caller context.
+      LOG.info("We are resetting the hadoop caller context for thread "
+          + Thread.currentThread().getName());
+      ShimLoader.getHadoopShims().setHadoopCallerContext("");
     }
 
     SessionState.detachSession();

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 916a96b..7d31f03 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -1383,7 +1384,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     }
     try {
       Subject origSubject = (Subject) getSubjectMethod.invoke(baseUgi);
-      
+
       Subject subject = new Subject(false, origSubject.getPrincipals(),
           cloneCredentials(origSubject.getPublicCredentials()),
           cloneCredentials(origSubject.getPrivateCredentials()));
@@ -1401,5 +1402,16 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     }
     return set;
   }
-  
+
+  public void setHadoopCallerContext(String callerContext) {
+    CallerContext.setCurrent(new CallerContext.Builder(callerContext).build());
+  }
+
+  @Override
+  public String getHadoopCallerContext() {
+    if (CallerContext.getCurrent() == null) {
+      return "";
+    }
+    return CallerContext.getCurrent().getContext();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index b88e8c9..3887474 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -662,4 +662,14 @@ public interface HadoopShims {
 
   /** Clones the UGI and the Subject. */
   UserGroupInformation cloneUgi(UserGroupInformation baseUgi) throws IOException;
+
+  /*
+   * Set up the caller context for HDFS and Yarn.
+   */
+  public void setHadoopCallerContext(String callerContext);
+
+  /*
+   * get current caller context of HDFS and Yarn.
+   */
+  public String getHadoopCallerContext();
 }