You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sameer Agarwal (JIRA)" <ji...@apache.org> on 2018/01/08 20:08:01 UTC

[jira] [Updated] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement

     [ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sameer Agarwal updated SPARK-21395:
-----------------------------------
    Target Version/s: 2.4.0  (was: 2.3.0)

> Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-21395
>                 URL: https://issues.apache.org/jira/browse/SPARK-21395
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: Chaozhong Yang
>
> In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for  TFetchResultsReq with fetchType(1). We 
>  have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:
> {code:java}
>   @Override
>   public void runInternal() throws HiveSQLException {
>     setState(OperationState.PENDING);
>     final HiveConf opConfig = getConfigForOperation();
>     prepare(opConfig);
>     if (!shouldRunAsync()) {
>       runQuery(opConfig);
>     } else {
>       // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
>       final SessionState parentSessionState = SessionState.get();
>       // ThreadLocal Hive object needs to be set in background thread.
>       // The metastore client in Hive is associated with right user.
>       final Hive parentHive = getSessionHive();
>       // Current UGI will get used by metastore when metsatore is in embedded mode
>       // So this needs to get passed to the new background thread
>       final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
>       // Runnable impl to call runInternal asynchronously,
>       // from a different thread
>       Runnable backgroundOperation = new Runnable() {
>         @Override
>         public void run() {
>           PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
>             @Override
>             public Object run() throws HiveSQLException {
>               Hive.set(parentHive);
>               SessionState.setCurrentSessionState(parentSessionState);
>               // Set current OperationLog in this async thread for keeping on saving query log.
>               registerCurrentOperationLog();
>               try {
>                 runQuery(opConfig);
>               } catch (HiveSQLException e) {
>                 setOperationException(e);
>                 LOG.error("Error running hive query: ", e);
>               } finally {
>                 unregisterOperationLog();
>               }
>               return null;
>             }
>           };
>           try {
>             currentUGI.doAs(doAsAction);
>           } catch (Exception e) {
>             setOperationException(new HiveSQLException(e));
>             LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
>           }
>           finally {
>             /**
>              * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
>              * when this thread is garbage collected later.
>              * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
>              */
>             if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
>               ThreadWithGarbageCleanup currentThread =
>                   (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
>               currentThread.cacheThreadLocalRawStore();
>             }
>           }
>         }
>       };
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         Future<?> backgroundHandle =
>             getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
>         setBackgroundHandle(backgroundHandle);
>       } catch (RejectedExecutionException rejected) {
>         setState(OperationState.ERROR);
>         throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected);
>       }
>     }
>   }
> {code}
> Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.
> But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:
> {code:scala}
>   override def runInternal(): Unit = {
>     setState(OperationState.PENDING)
>     setHasResultSet(true) // avoid no resultset for async run
>     if (!runInBackground) {
>       execute()
>     } else {
>       val sparkServiceUGI = Utils.getUGI()
>       // Runnable impl to call runInternal asynchronously,
>       // from a different thread
>       val backgroundOperation = new Runnable() {
>         override def run(): Unit = {
>           val doAsAction = new PrivilegedExceptionAction[Unit]() {
>             override def run(): Unit = {
>               try {
>                 execute()
>               } catch {
>                 case e: HiveSQLException =>
>                   setOperationException(e)
>                   log.error("Error running hive query: ", e)
>               }
>             }
>           }
>           try {
>             sparkServiceUGI.doAs(doAsAction)
>           } catch {
>             case e: Exception =>
>               setOperationException(new HiveSQLException(e))
>               logError("Error running hive query as user : " +
>                 sparkServiceUGI.getShortUserName(), e)
>           }
>         }
>       }
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         val backgroundHandle =
>           parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
>         setBackgroundHandle(backgroundHandle)
>       } catch {
>         case rejected: RejectedExecutionException =>
>           setState(OperationState.ERROR)
>           throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected)
>         case NonFatal(e) =>
>           logError(s"Error executing query in background", e)
>           setState(OperationState.ERROR)
>           throw e
>       }
>     }
>   }
> {code}
> LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:
> {code:java}
>   @Override
>   protected void subAppend(LoggingEvent event) {
>     super.subAppend(event);
>     // That should've gone into our writer. Notify the LogContext.
>     String logOutput = writer.toString();
>     writer.reset();
>     OperationLog log = operationManager.getOperationLogByThread();
>     if (log == null) {
>       LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
>       return;
>     }
>     log.writeOperationLog(logOutput);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org