You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2017/07/16 20:25:01 UTC
[jira] [Commented] (SPARK-21395) Spark SQL hive-thriftserver
doesn't register operation log before execute sql statement
[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089106#comment-16089106 ]
Apache Spark commented on SPARK-21395:
--------------------------------------
User 'debugger87' has created a pull request for this issue:
https://github.com/apache/spark/pull/18649
> Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-21395
> URL: https://issues.apache.org/jira/browse/SPARK-21395
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0, 2.1.1
> Reporter: Chaozhong Yang
>
> In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We
> have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:
> {code:java}
> @Override
> public void runInternal() throws HiveSQLException {
> setState(OperationState.PENDING);
> final HiveConf opConfig = getConfigForOperation();
> prepare(opConfig);
> if (!shouldRunAsync()) {
> runQuery(opConfig);
> } else {
> // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
> final SessionState parentSessionState = SessionState.get();
> // ThreadLocal Hive object needs to be set in background thread.
> // The metastore client in Hive is associated with right user.
> final Hive parentHive = getSessionHive();
> // Current UGI will get used by metastore when metsatore is in embedded mode
> // So this needs to get passed to the new background thread
> final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
> // Runnable impl to call runInternal asynchronously,
> // from a different thread
> Runnable backgroundOperation = new Runnable() {
> @Override
> public void run() {
> PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
> @Override
> public Object run() throws HiveSQLException {
> Hive.set(parentHive);
> SessionState.setCurrentSessionState(parentSessionState);
> // Set current OperationLog in this async thread for keeping on saving query log.
> registerCurrentOperationLog();
> try {
> runQuery(opConfig);
> } catch (HiveSQLException e) {
> setOperationException(e);
> LOG.error("Error running hive query: ", e);
> } finally {
> unregisterOperationLog();
> }
> return null;
> }
> };
> try {
> currentUGI.doAs(doAsAction);
> } catch (Exception e) {
> setOperationException(new HiveSQLException(e));
> LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
> }
> finally {
> /**
> * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
> * when this thread is garbage collected later.
> * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
> */
> if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
> ThreadWithGarbageCleanup currentThread =
> (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
> currentThread.cacheThreadLocalRawStore();
> }
> }
> }
> };
> try {
> // This submit blocks if no background threads are available to run this operation
> Future<?> backgroundHandle =
> getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
> setBackgroundHandle(backgroundHandle);
> } catch (RejectedExecutionException rejected) {
> setState(OperationState.ERROR);
> throw new HiveSQLException("The background threadpool cannot accept" +
> " new task for execution, please retry the operation", rejected);
> }
> }
> }
> {code}
> Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.
> But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:
> {code:scala}
> override def runInternal(): Unit = {
> setState(OperationState.PENDING)
> setHasResultSet(true) // avoid no resultset for async run
> if (!runInBackground) {
> execute()
> } else {
> val sparkServiceUGI = Utils.getUGI()
> // Runnable impl to call runInternal asynchronously,
> // from a different thread
> val backgroundOperation = new Runnable() {
> override def run(): Unit = {
> val doAsAction = new PrivilegedExceptionAction[Unit]() {
> override def run(): Unit = {
> try {
> execute()
> } catch {
> case e: HiveSQLException =>
> setOperationException(e)
> log.error("Error running hive query: ", e)
> }
> }
> }
> try {
> sparkServiceUGI.doAs(doAsAction)
> } catch {
> case e: Exception =>
> setOperationException(new HiveSQLException(e))
> logError("Error running hive query as user : " +
> sparkServiceUGI.getShortUserName(), e)
> }
> }
> }
> try {
> // This submit blocks if no background threads are available to run this operation
> val backgroundHandle =
> parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
> setBackgroundHandle(backgroundHandle)
> } catch {
> case rejected: RejectedExecutionException =>
> setState(OperationState.ERROR)
> throw new HiveSQLException("The background threadpool cannot accept" +
> " new task for execution, please retry the operation", rejected)
> case NonFatal(e) =>
> logError(s"Error executing query in background", e)
> setState(OperationState.ERROR)
> throw e
> }
> }
> }
> {code}
> LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:
> {code:java}
> @Override
> protected void subAppend(LoggingEvent event) {
> super.subAppend(event);
> // That should've gone into our writer. Notify the LogContext.
> String logOutput = writer.toString();
> writer.reset();
> OperationLog log = operationManager.getOperationLogByThread();
> if (log == null) {
> LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
> return;
> }
> log.writeOperationLog(logOutput);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org