You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/05/20 04:10:06 UTC
[2/3] hive git commit: HIVE-12387: Issues in Hive's use of
CallerContext (Vikram Dixit K, reviewed by Gunther Hagleitner)
HIVE-12387: Issues in Hive's use of CallerContext (Vikram Dixit K, reviewed by Gunther Hagleitner)
Conflicts:
cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
ql/src/java/org/apache/hadoop/hive/ql/Driver.java
ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4b676cd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4b676cd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4b676cd
Branch: refs/heads/branch-2.2
Commit: f4b676cd62b304b1926615ec3b53bd65dac7c3ad
Parents: 7ea8e1c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Dec 16 10:57:35 2016 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri May 19 16:22:52 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 39 ++++++++++++++++----
.../org/apache/hadoop/hive/ql/QueryPlan.java | 35 ++++++++++++++++--
.../apache/hadoop/hive/ql/hooks/ATSHook.java | 25 ++++++++-----
.../parse/TestUpdateDeleteSemanticAnalyzer.java | 2 +-
.../service/cli/session/HiveSessionImpl.java | 10 ++++-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 16 +++++++-
.../apache/hadoop/hive/shims/HadoopShims.java | 10 +++++
7 files changed, 114 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 08bd040..c8e2c4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -112,6 +112,8 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
@@ -396,12 +398,13 @@ public class Driver implements CommandProcessor {
}
if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
- close();
+ closeInProcess(false);
}
+
if (isInterrupted()) {
return handleInterruption("at beginning of compilation."); //indicate if need clean resource
}
-
+
if (resetTaskIds) {
TaskFactory.resetId();
}
@@ -416,6 +419,8 @@ public class Driver implements CommandProcessor {
SessionState.get().setupQueryCurrentTimestamp();
+ String originalCallerContext = "";
+ HadoopShims shim = ShimLoader.getHadoopShims();
boolean compileError = false;
try {
// Initialize the transaction manager. This must be done before analyze is called.
@@ -437,6 +442,13 @@ public class Driver implements CommandProcessor {
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
+ // we set the hadoop caller context to the query id as soon as we have one.
+ // initially, the caller context is the session id (when creating temp directories)
+ originalCallerContext = shim.getHadoopCallerContext();
+ LOG.info("We are setting the hadoop caller context from " + originalCallerContext + " to "
+ + queryId);
+ shim.setHadoopQueryContext(queryId);
+
if (isInterrupted()) {
return handleInterruption("before parsing and analysing the query");
}
@@ -444,7 +456,7 @@ public class Driver implements CommandProcessor {
if (ctx == null) {
ctx = new Context(conf);
}
-
+
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
@@ -502,7 +514,9 @@ public class Driver implements CommandProcessor {
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- queryState.getHiveOperation(), schema);
+ queryState.getHiveOperation(), schema,
+ SessionState.get().getSessionId(), Thread.currentThread().getName(),
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_LOG_TRACE_ID));
conf.setQueryString(queryStr);
@@ -598,6 +612,10 @@ public class Driver implements CommandProcessor {
} else {
LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
+
+ // reset the caller id.
+ LOG.info("We are resetting the hadoop caller context to " + originalCallerContext);
+ shim.setHadoopCallerContext(originalCallerContext);
}
}
@@ -687,7 +705,7 @@ public class Driver implements CommandProcessor {
}
// The following union operation returns a union, which traverses over the
- // first set once and then then over each element of second set, in order,
+ // first set once and then then over each element of second set, in order,
// that is not contained in first. This means it doesn't replace anything
// in first set, and would preserve the WriteType in WriteEntity in first
// set in case of outputs list.
@@ -1009,7 +1027,7 @@ public class Driver implements CommandProcessor {
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
if(plan.getFetchTask() != null) {
/**
- * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which
+ * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which
* initializes JobConf in FetchOperator before recordValidTxns() but this has to be done
* after locks are acquired to avoid race conditions in ACID.
*/
@@ -1663,8 +1681,13 @@ public class Driver implements CommandProcessor {
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
HookContext hookContext = null;
+
+ String originalCallerContext = "";
boolean executionError = false;
try {
+ LOG.info("Setting caller context to query id " + queryId);
+ originalCallerContext = ShimLoader.getHadoopShims().getHadoopCallerContext();
+ ShimLoader.getHadoopShims().setHadoopQueryContext(queryId);
LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
// compile and execute can get called from different threads in case of HS2
// so clear timing in this thread's Hive object before proceeding.
@@ -1908,6 +1931,8 @@ public class Driver implements CommandProcessor {
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return (12);
} finally {
+ LOG.info("Resetting the caller context to " + originalCallerContext);
+ ShimLoader.getHadoopShims().setHadoopCallerContext(originalCallerContext);
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().endQuery(queryId);
}
@@ -2348,7 +2373,7 @@ public class Driver implements CommandProcessor {
this.operationId = opId;
}
- /**
+ /**
* Resets QueryState to get new queryId on Driver reuse.
*/
public void resetQueryState() {
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..d05e53b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -106,6 +106,9 @@ public class QueryPlan implements Serializable {
private transient Long queryStartTime;
private final HiveOperation operation;
private Boolean autoCommitValue;
+ private String sessionId;
+ private String threadName;
+ private String userProvidedContext;
public QueryPlan() {
this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
@@ -113,7 +116,8 @@ public class QueryPlan implements Serializable {
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- HiveOperation operation, Schema resultSchema) {
+ HiveOperation operation, Schema resultSchema,
+ String sessionId, String threadName, String userProvidedContext) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
@@ -136,6 +140,9 @@ public class QueryPlan implements Serializable {
this.operation = operation;
this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
+ this.setSessionId(sessionId);
+ this.setThreadName(threadName);
+ this.setUserProvidedContext(userProvidedContext);
}
public String getQueryStr() {
@@ -609,7 +616,6 @@ public class QueryPlan implements Serializable {
try {
q.write(oprot);
} catch (TException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
return q.toString();
}
@@ -623,7 +629,6 @@ public class QueryPlan implements Serializable {
try {
q.write(oprot);
} catch (TException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
return q.toString();
}
@@ -802,4 +807,28 @@ public class QueryPlan implements Serializable {
public Boolean getAutoCommitValue() {
return autoCommitValue;
}
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getThreadName() {
+ return threadName;
+ }
+
+ public void setThreadName(String threadName) {
+ this.threadName = threadName;
+ }
+
+ public String getUserProvidedContext() {
+ return userProvidedContext;
+ }
+
+ public void setUserProvidedContext(String userProvidedContext) {
+ this.userProvidedContext = userProvidedContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 218e0b4..f968da6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -79,8 +79,8 @@ public class ATSHook implements ExecuteWithHookContext {
private static final String DEFAULT_ATS_DOMAIN = "hive_default_ats";
private enum OtherInfoTypes {
- QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION,
- CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF,
+ QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, LOG_TRACE_ID, THREAD_NAME, VERSION,
+ CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF, LLAP_APP_ID
};
private enum ExecutionMode {
MR, TEZ, LLAP, SPARK, NONE
@@ -275,8 +275,7 @@ public class ATSHook implements ExecuteWithHookContext {
String query = plan.getQueryStr();
JSONObject explainPlan = explain.getJSONPlan(null,
work);
- String logID = conf.getLogIdVar(hookContext.getSessionId());
- List<String> tablesRead = getTablesFromEntitySet(hookContext
+ List<String> tablesRead = getTablesFromEntitySet(hookContext
.getInputs());
List<String> tablesWritten = getTablesFromEntitySet(hookContext
.getOutputs());
@@ -297,9 +296,9 @@ public class ATSHook implements ExecuteWithHookContext {
hookContext.getIpAddress(),
hiveInstanceAddress, hiveInstanceType,
hookContext.getSessionId(),
- logID,
+ plan.getUserProvidedContext(),
hookContext.getThreadId(), executionMode,
- tablesRead, tablesWritten, conf));
+ tablesRead, tablesWritten, conf, llapId, domainId));
break;
case POST_EXEC_HOOK:
fireAndForget(createPostHookEvent(queryId,
@@ -361,8 +360,10 @@ public class ATSHook implements ExecuteWithHookContext {
TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan,
long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId,
String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType,
- String sessionID, String logID, String threadId, String executionMode,
- List<String> tablesRead, List<String> tablesWritten, HiveConf conf) throws Exception {
+ String sessionID, String logTraceId, String threadId, String executionMode,
+ List<String> tablesRead, List<String> tablesWritten, HiveConf conf, ApplicationId llapAppId,
+ String domainId)
+ throws Exception {
JSONObject queryObj = new JSONObject(new LinkedHashMap<>());
queryObj.put("queryText", query);
@@ -409,8 +410,10 @@ public class ATSHook implements ExecuteWithHookContext {
atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), numTezJobs > 0);
atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0);
atsEntity.addOtherInfo(OtherInfoTypes.SESSION_ID.name(), sessionID);
- atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID);
atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), threadId);
+ if ((logTraceId != null) && (logTraceId.equals("") == false)) {
+ atsEntity.addOtherInfo(OtherInfoTypes.LOG_TRACE_ID.name(), logTraceId);
+ }
atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION);
if (clientIpAddress != null) {
atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), clientIpAddress);
@@ -418,6 +421,10 @@ public class ATSHook implements ExecuteWithHookContext {
atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), hiveInstanceAddress);
atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), hiveInstanceType);
atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), confObj.toString());
+ if (llapAppId != null) {
+ atsEntity.addOtherInfo(OtherInfoTypes.LLAP_APP_ID.name(), llapAppId.toString());
+ }
+ atsEntity.setDomainId(domainId);
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
index a573808..9fa3b25 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
@@ -283,7 +283,7 @@ public class TestUpdateDeleteSemanticAnalyzer {
// validate the plan
sem.validate();
- QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null);
+ QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null, "", "", "");
return new ReturnInfo(sem, plan);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 3f9dfdc..4d5cdac 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -355,7 +355,7 @@ public class HiveSessionImpl implements HiveSession {
}
private synchronized void acquireAfterOpLock(boolean userAccess) {
- // Need to make sure that the this HiveServer2's session's SessionState is
+ // Need to make sure that this HiveServer2's session's session state is
// stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses());
@@ -365,6 +365,10 @@ public class HiveSessionImpl implements HiveSession {
}
// set the thread name with the logging prefix.
sessionState.updateThreadName();
+ // set the log context for debugging
+ LOG.info("We are setting the hadoop caller context to " + sessionState.getSessionId()
+ + " for thread " + Thread.currentThread().getName());
+ ShimLoader.getHadoopShims().setHadoopCallerContext(sessionState.getSessionId());
Hive.set(sessionHive);
}
@@ -390,6 +394,10 @@ public class HiveSessionImpl implements HiveSession {
// can be null in-case of junit tests. skip reset.
// reset thread name at release time.
sessionState.resetThreadName();
+ // reset the HDFS caller context.
+ LOG.info("We are resetting the hadoop caller context for thread "
+ + Thread.currentThread().getName());
+ ShimLoader.getHadoopShims().setHadoopCallerContext("");
}
SessionState.detachSession();
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 916a96b..7d31f03 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -1383,7 +1384,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
try {
Subject origSubject = (Subject) getSubjectMethod.invoke(baseUgi);
-
+
Subject subject = new Subject(false, origSubject.getPrincipals(),
cloneCredentials(origSubject.getPublicCredentials()),
cloneCredentials(origSubject.getPrivateCredentials()));
@@ -1401,5 +1402,16 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
return set;
}
-
+
+ public void setHadoopCallerContext(String callerContext) {
+ CallerContext.setCurrent(new CallerContext.Builder(callerContext).build());
+ }
+
+ @Override
+ public String getHadoopCallerContext() {
+ if (CallerContext.getCurrent() == null) {
+ return "";
+ }
+ return CallerContext.getCurrent().getContext();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4b676cd/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index b88e8c9..3887474 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -662,4 +662,14 @@ public interface HadoopShims {
/** Clones the UGI and the Subject. */
UserGroupInformation cloneUgi(UserGroupInformation baseUgi) throws IOException;
+
+ /*
+ * Set up the caller context for HDFS and Yarn.
+ */
+ public void setHadoopCallerContext(String callerContext);
+
+ /*
+ * get current caller context of HDFS and Yarn.
+ */
+ public String getHadoopCallerContext();
}