You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/02/03 14:48:00 UTC

[jira] [Assigned] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

     [ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen reassigned SPARK-26751:
---------------------------------

    Assignee: zhoukang

> HiveSessionImpl might have memory leak since Operation do not close properly
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-26751
>                 URL: https://issues.apache.org/jira/browse/SPARK-26751
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: zhoukang
>            Assignee: zhoukang
>            Priority: Major
>         Attachments: 26751.png
>
>
> When we run in background and we get exception which is not HiveSQLException,
> we may encounter memory leak since handleToOperation will not removed correctly.
> The reason is below:
> 1. when calling operation.run we throw an exception which is not HiveSQLException
> 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
> {code:java}
>  private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
>         this.acquire(true);
>         OperationManager operationManager = this.getOperationManager();
>         ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
>         OperationHandle opHandle = operation.getHandle();
>         OperationHandle e;
>         try {
>             operation.run();
>             this.opHandleSet.add(opHandle);
>             e = opHandle;
>         } catch (HiveSQLException var11) {
>             operationManager.closeOperation(opHandle);
>             throw var11;
>         } finally {
>             this.release(true);
>         }
>         return e;
>     }
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         val backgroundHandle =
>           parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
>         setBackgroundHandle(backgroundHandle)
>       } catch {
>         case rejected: RejectedExecutionException =>
>           setState(OperationState.ERROR)
>           throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected)
>         case NonFatal(e) =>
>           logError(s"Error executing query in background", e)
>           setState(OperationState.ERROR)
>           throw e
>       }
>     }
> {code}
> 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
> {code}
> public void close() throws HiveSQLException {
>         try {
>             this.acquire(true);
>             Iterator ioe = this.opHandleSet.iterator();
>             while(ioe.hasNext()) {
>                 OperationHandle opHandle = (OperationHandle)ioe.next();
>                 this.operationManager.closeOperation(opHandle);
>             }
>             this.opHandleSet.clear();
>             this.cleanupSessionLogDir();
>             this.cleanupPipeoutFile();
>             HiveHistory ioe1 = this.sessionState.getHiveHistory();
>             if(null != ioe1) {
>                 ioe1.closeStream();
>             }
>             try {
>                 this.sessionState.close();
>             } finally {
>                 this.sessionState = null;
>             }
>         } catch (IOException var17) {
>             throw new HiveSQLException("Failure to close", var17);
>         } finally {
>             if(this.sessionState != null) {
>                 try {
>                     this.sessionState.close();
>                 } catch (Throwable var15) {
>                     LOG.warn("Error closing session", var15);
>                 }
>                 this.sessionState = null;
>             }
>             this.release(true);
>         }
>     }
> {code}
> 4. however, the opHandle will added into handleToOperation for each statement
> {code}
> val handleToOperation = ReflectionUtils
>     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
>   val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
>   val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
>   override def newExecuteStatementOperation(
>       parentSession: HiveSession,
>       statement: String,
>       confOverlay: JMap[String, String],
>       async: Boolean): ExecuteStatementOperation = synchronized {
>     val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
>     require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
>       s" initialized or had already closed.")
>     val conf = sqlContext.sessionState.conf
>     val hiveSessionState = parentSession.getSessionState
>     setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
>     setConfMap(conf, hiveSessionState.getHiveVariables)
>     val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
>     val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
>       runInBackground)(sqlContext, sessionToActivePool)
>     handleToOperation.put(operation.getHandle, operation)
>     logDebug(s"Created Operation for $statement with session=$parentSession, " +
>       s"runInBackground=$runInBackground")
>     operation
>   }
> {code}
> Below is an example which has memory leak:
>  !26751.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org