You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chaozhong Yang (JIRA)" <ji...@apache.org> on 2017/07/13 02:11:00 UTC

[jira] [Created] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement

Chaozhong Yang created SPARK-21395:
--------------------------------------

             Summary: Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
                 Key: SPARK-21395
                 URL: https://issues.apache.org/jira/browse/SPARK-21395
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.1, 2.1.0
            Reporter: Chaozhong Yang


In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for  TFetchResultsReq with fetchType(1). We 
 have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log file with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:


{code:java}
  @Override
  public void runInternal() throws HiveSQLException {
    setState(OperationState.PENDING);
    final HiveConf opConfig = getConfigForOperation();
    prepare(opConfig);
    if (!shouldRunAsync()) {
      runQuery(opConfig);
    } else {
      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
      final SessionState parentSessionState = SessionState.get();
      // ThreadLocal Hive object needs to be set in background thread.
      // The metastore client in Hive is associated with right user.
      final Hive parentHive = getSessionHive();
      // Current UGI will get used by metastore when metsatore is in embedded mode
      // So this needs to get passed to the new background thread
      final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      Runnable backgroundOperation = new Runnable() {
        @Override
        public void run() {
          PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws HiveSQLException {
              Hive.set(parentHive);
              SessionState.setCurrentSessionState(parentSessionState);
              // Set current OperationLog in this async thread for keeping on saving query log.
              registerCurrentOperationLog();
              try {
                runQuery(opConfig);
              } catch (HiveSQLException e) {
                setOperationException(e);
                LOG.error("Error running hive query: ", e);
              } finally {
                unregisterOperationLog();
              }
              return null;
            }
          };

          try {
            currentUGI.doAs(doAsAction);
          } catch (Exception e) {
            setOperationException(new HiveSQLException(e));
            LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
          }
          finally {
            /**
             * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
             * when this thread is garbage collected later.
             * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
             */
            if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
              ThreadWithGarbageCleanup currentThread =
                  (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
              currentThread.cacheThreadLocalRawStore();
            }
          }
        }
      };
      try {
        // This submit blocks if no background threads are available to run this operation
        Future<?> backgroundHandle =
            getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
        setBackgroundHandle(backgroundHandle);
      } catch (RejectedExecutionException rejected) {
        setState(OperationState.ERROR);
        throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected);
      }
    }
  }
{code}

Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.

But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:

{code:scala}
  override def runInternal(): Unit = {
    setState(OperationState.PENDING)
    setHasResultSet(true) // avoid no resultset for async run

    if (!runInBackground) {
      execute()
    } else {
      val sparkServiceUGI = Utils.getUGI()

      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      val backgroundOperation = new Runnable() {

        override def run(): Unit = {
          val doAsAction = new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              try {
                execute()
              } catch {
                case e: HiveSQLException =>
                  setOperationException(e)
                  log.error("Error running hive query: ", e)
              }
            }
          }

          try {
            sparkServiceUGI.doAs(doAsAction)
          } catch {
            case e: Exception =>
              setOperationException(new HiveSQLException(e))
              logError("Error running hive query as user : " +
                sparkServiceUGI.getShortUserName(), e)
          }
        }
      }
      try {
        // This submit blocks if no background threads are available to run this operation
        val backgroundHandle =
          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
  }
{code}

LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:

{code:java}
  @Override
  protected void subAppend(LoggingEvent event) {
    super.subAppend(event);
    // That should've gone into our writer. Notify the LogContext.
    String logOutput = writer.toString();
    writer.reset();

    OperationLog log = operationManager.getOperationLogByThread();
    if (log == null) {
      LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
      return;
    }
    log.writeOperationLog(logOutput);
  }
{code}





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org