You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chaozhong Yang (JIRA)" <ji...@apache.org> on 2017/07/13 02:11:00 UTC
[jira] [Created] (SPARK-21395) Spark SQL hive-thriftserver doesn't
register operation log before execute sql statement
Chaozhong Yang created SPARK-21395:
--------------------------------------
Summary: Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
Key: SPARK-21395
URL: https://issues.apache.org/jira/browse/SPARK-21395
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.1.1, 2.1.0
Reporter: Chaozhong Yang
In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We
have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log file with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:
{code:java}
@Override
public void runInternal() throws HiveSQLException {
setState(OperationState.PENDING);
final HiveConf opConfig = getConfigForOperation();
prepare(opConfig);
if (!shouldRunAsync()) {
runQuery(opConfig);
} else {
// We'll pass ThreadLocals in the background thread from the foreground (handler) thread
final SessionState parentSessionState = SessionState.get();
// ThreadLocal Hive object needs to be set in background thread.
// The metastore client in Hive is associated with right user.
final Hive parentHive = getSessionHive();
// Current UGI will get used by metastore when metsatore is in embedded mode
// So this needs to get passed to the new background thread
final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
// Runnable impl to call runInternal asynchronously,
// from a different thread
Runnable backgroundOperation = new Runnable() {
@Override
public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
Hive.set(parentHive);
SessionState.setCurrentSessionState(parentSessionState);
// Set current OperationLog in this async thread for keeping on saving query log.
registerCurrentOperationLog();
try {
runQuery(opConfig);
} catch (HiveSQLException e) {
setOperationException(e);
LOG.error("Error running hive query: ", e);
} finally {
unregisterOperationLog();
}
return null;
}
};
try {
currentUGI.doAs(doAsAction);
} catch (Exception e) {
setOperationException(new HiveSQLException(e));
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
}
finally {
/**
* We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
}
}
}
};
try {
// This submit blocks if no background threads are available to run this operation
Future<?> backgroundHandle =
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected);
}
}
}
{code}
Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.
But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:
{code:scala}
override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
execute()
} else {
val sparkServiceUGI = Utils.getUGI()
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
log.error("Error running hive query: ", e)
}
}
}
try {
sparkServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " +
sparkServiceUGI.getShortUserName(), e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
throw e
}
}
}
{code}
LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:
{code:java}
@Override
protected void subAppend(LoggingEvent event) {
super.subAppend(event);
// That should've gone into our writer. Notify the LogContext.
String logOutput = writer.toString();
writer.reset();
OperationLog log = operationManager.getOperationLogByThread();
if (log == null) {
LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
return;
}
log.writeOperationLog(logOutput);
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org