You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chaozhong Yang (JIRA)" <ji...@apache.org> on 2017/07/13 02:16:01 UTC

[jira] [Updated] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement

     [ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chaozhong Yang updated SPARK-21395:
-----------------------------------
    Description: 
In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for  TFetchResultsReq with fetchType(1). We 
 have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:


{code:java}
  @Override
  public void runInternal() throws HiveSQLException {
    setState(OperationState.PENDING);
    final HiveConf opConfig = getConfigForOperation();
    prepare(opConfig);
    if (!shouldRunAsync()) {
      runQuery(opConfig);
    } else {
      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
      final SessionState parentSessionState = SessionState.get();
      // ThreadLocal Hive object needs to be set in background thread.
      // The metastore client in Hive is associated with right user.
      final Hive parentHive = getSessionHive();
      // Current UGI will get used by metastore when metsatore is in embedded mode
      // So this needs to get passed to the new background thread
      final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      Runnable backgroundOperation = new Runnable() {
        @Override
        public void run() {
          PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws HiveSQLException {
              Hive.set(parentHive);
              SessionState.setCurrentSessionState(parentSessionState);
              // Set current OperationLog in this async thread for keeping on saving query log.
              registerCurrentOperationLog();
              try {
                runQuery(opConfig);
              } catch (HiveSQLException e) {
                setOperationException(e);
                LOG.error("Error running hive query: ", e);
              } finally {
                unregisterOperationLog();
              }
              return null;
            }
          };

          try {
            currentUGI.doAs(doAsAction);
          } catch (Exception e) {
            setOperationException(new HiveSQLException(e));
            LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
          }
          finally {
            /**
             * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
             * when this thread is garbage collected later.
             * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
             */
            if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
              ThreadWithGarbageCleanup currentThread =
                  (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
              currentThread.cacheThreadLocalRawStore();
            }
          }
        }
      };
      try {
        // This submit blocks if no background threads are available to run this operation
        Future<?> backgroundHandle =
            getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
        setBackgroundHandle(backgroundHandle);
      } catch (RejectedExecutionException rejected) {
        setState(OperationState.ERROR);
        throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected);
      }
    }
  }
{code}

Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.

But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:

{code:scala}
  override def runInternal(): Unit = {
    setState(OperationState.PENDING)
    setHasResultSet(true) // avoid no resultset for async run

    if (!runInBackground) {
      execute()
    } else {
      val sparkServiceUGI = Utils.getUGI()

      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      val backgroundOperation = new Runnable() {

        override def run(): Unit = {
          val doAsAction = new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              try {
                execute()
              } catch {
                case e: HiveSQLException =>
                  setOperationException(e)
                  log.error("Error running hive query: ", e)
              }
            }
          }

          try {
            sparkServiceUGI.doAs(doAsAction)
          } catch {
            case e: Exception =>
              setOperationException(new HiveSQLException(e))
              logError("Error running hive query as user : " +
                sparkServiceUGI.getShortUserName(), e)
          }
        }
      }
      try {
        // This submit blocks if no background threads are available to run this operation
        val backgroundHandle =
          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
  }
{code}

LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:

{code:java}
  @Override
  protected void subAppend(LoggingEvent event) {
    super.subAppend(event);
    // That should've gone into our writer. Notify the LogContext.
    String logOutput = writer.toString();
    writer.reset();

    OperationLog log = operationManager.getOperationLogByThread();
    if (log == null) {
      LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
      return;
    }
    log.writeOperationLog(logOutput);
  }
{code}



  was:
In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for  TFetchResultsReq with fetchType(1). We 
 have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log file with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:


{code:java}
  @Override
  public void runInternal() throws HiveSQLException {
    setState(OperationState.PENDING);
    final HiveConf opConfig = getConfigForOperation();
    prepare(opConfig);
    if (!shouldRunAsync()) {
      runQuery(opConfig);
    } else {
      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
      final SessionState parentSessionState = SessionState.get();
      // ThreadLocal Hive object needs to be set in background thread.
      // The metastore client in Hive is associated with right user.
      final Hive parentHive = getSessionHive();
      // Current UGI will get used by metastore when metsatore is in embedded mode
      // So this needs to get passed to the new background thread
      final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      Runnable backgroundOperation = new Runnable() {
        @Override
        public void run() {
          PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws HiveSQLException {
              Hive.set(parentHive);
              SessionState.setCurrentSessionState(parentSessionState);
              // Set current OperationLog in this async thread for keeping on saving query log.
              registerCurrentOperationLog();
              try {
                runQuery(opConfig);
              } catch (HiveSQLException e) {
                setOperationException(e);
                LOG.error("Error running hive query: ", e);
              } finally {
                unregisterOperationLog();
              }
              return null;
            }
          };

          try {
            currentUGI.doAs(doAsAction);
          } catch (Exception e) {
            setOperationException(new HiveSQLException(e));
            LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
          }
          finally {
            /**
             * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
             * when this thread is garbage collected later.
             * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
             */
            if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
              ThreadWithGarbageCleanup currentThread =
                  (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
              currentThread.cacheThreadLocalRawStore();
            }
          }
        }
      };
      try {
        // This submit blocks if no background threads are available to run this operation
        Future<?> backgroundHandle =
            getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
        setBackgroundHandle(backgroundHandle);
      } catch (RejectedExecutionException rejected) {
        setState(OperationState.ERROR);
        throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected);
      }
    }
  }
{code}

Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.

But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:

{code:scala}
  override def runInternal(): Unit = {
    setState(OperationState.PENDING)
    setHasResultSet(true) // avoid no resultset for async run

    if (!runInBackground) {
      execute()
    } else {
      val sparkServiceUGI = Utils.getUGI()

      // Runnable impl to call runInternal asynchronously,
      // from a different thread
      val backgroundOperation = new Runnable() {

        override def run(): Unit = {
          val doAsAction = new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              try {
                execute()
              } catch {
                case e: HiveSQLException =>
                  setOperationException(e)
                  log.error("Error running hive query: ", e)
              }
            }
          }

          try {
            sparkServiceUGI.doAs(doAsAction)
          } catch {
            case e: Exception =>
              setOperationException(new HiveSQLException(e))
              logError("Error running hive query as user : " +
                sparkServiceUGI.getShortUserName(), e)
          }
        }
      }
      try {
        // This submit blocks if no background threads are available to run this operation
        val backgroundHandle =
          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
  }
{code}

LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:

{code:java}
  @Override
  protected void subAppend(LoggingEvent event) {
    super.subAppend(event);
    // That should've gone into our writer. Notify the LogContext.
    String logOutput = writer.toString();
    writer.reset();

    OperationLog log = operationManager.getOperationLogByThread();
    if (log == null) {
      LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
      return;
    }
    log.writeOperationLog(logOutput);
  }
{code}




> Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-21395
>                 URL: https://issues.apache.org/jira/browse/SPARK-21395
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: Chaozhong Yang
>
> In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for  TFetchResultsReq with fetchType(1). We 
>  have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive:
> {code:java}
>   @Override
>   public void runInternal() throws HiveSQLException {
>     setState(OperationState.PENDING);
>     final HiveConf opConfig = getConfigForOperation();
>     prepare(opConfig);
>     if (!shouldRunAsync()) {
>       runQuery(opConfig);
>     } else {
>       // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
>       final SessionState parentSessionState = SessionState.get();
>       // ThreadLocal Hive object needs to be set in background thread.
>       // The metastore client in Hive is associated with right user.
>       final Hive parentHive = getSessionHive();
>       // Current UGI will get used by metastore when metsatore is in embedded mode
>       // So this needs to get passed to the new background thread
>       final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
>       // Runnable impl to call runInternal asynchronously,
>       // from a different thread
>       Runnable backgroundOperation = new Runnable() {
>         @Override
>         public void run() {
>           PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
>             @Override
>             public Object run() throws HiveSQLException {
>               Hive.set(parentHive);
>               SessionState.setCurrentSessionState(parentSessionState);
>               // Set current OperationLog in this async thread for keeping on saving query log.
>               registerCurrentOperationLog();
>               try {
>                 runQuery(opConfig);
>               } catch (HiveSQLException e) {
>                 setOperationException(e);
>                 LOG.error("Error running hive query: ", e);
>               } finally {
>                 unregisterOperationLog();
>               }
>               return null;
>             }
>           };
>           try {
>             currentUGI.doAs(doAsAction);
>           } catch (Exception e) {
>             setOperationException(new HiveSQLException(e));
>             LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
>           }
>           finally {
>             /**
>              * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
>              * when this thread is garbage collected later.
>              * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
>              */
>             if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
>               ThreadWithGarbageCleanup currentThread =
>                   (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
>               currentThread.cacheThreadLocalRawStore();
>             }
>           }
>         }
>       };
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         Future<?> backgroundHandle =
>             getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
>         setBackgroundHandle(backgroundHandle);
>       } catch (RejectedExecutionException rejected) {
>         setState(OperationState.ERROR);
>         throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected);
>       }
>     }
>   }
> {code}
> Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client.
> But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement:
> {code:scala}
>   override def runInternal(): Unit = {
>     setState(OperationState.PENDING)
>     setHasResultSet(true) // avoid no resultset for async run
>     if (!runInBackground) {
>       execute()
>     } else {
>       val sparkServiceUGI = Utils.getUGI()
>       // Runnable impl to call runInternal asynchronously,
>       // from a different thread
>       val backgroundOperation = new Runnable() {
>         override def run(): Unit = {
>           val doAsAction = new PrivilegedExceptionAction[Unit]() {
>             override def run(): Unit = {
>               try {
>                 execute()
>               } catch {
>                 case e: HiveSQLException =>
>                   setOperationException(e)
>                   log.error("Error running hive query: ", e)
>               }
>             }
>           }
>           try {
>             sparkServiceUGI.doAs(doAsAction)
>           } catch {
>             case e: Exception =>
>               setOperationException(new HiveSQLException(e))
>               logError("Error running hive query as user : " +
>                 sparkServiceUGI.getShortUserName(), e)
>           }
>         }
>       }
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         val backgroundHandle =
>           parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
>         setBackgroundHandle(backgroundHandle)
>       } catch {
>         case rejected: RejectedExecutionException =>
>           setState(OperationState.ERROR)
>           throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected)
>         case NonFatal(e) =>
>           logError(s"Error executing query in background", e)
>           setState(OperationState.ERROR)
>           throw e
>       }
>     }
>   }
> {code}
> LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog:
> {code:java}
>   @Override
>   protected void subAppend(LoggingEvent event) {
>     super.subAppend(event);
>     // That should've gone into our writer. Notify the LogContext.
>     String logOutput = writer.toString();
>     writer.reset();
>     OperationLog log = operationManager.getOperationLogByThread();
>     if (log == null) {
>       LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
>       return;
>     }
>     log.writeOperationLog(logOutput);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org