You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/02/03 14:48:00 UTC
[jira] [Assigned] (SPARK-26751) HiveSessionImpl might have memory
leak since Operation do not close properly
[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen reassigned SPARK-26751:
---------------------------------
Assignee: zhoukang
> HiveSessionImpl might have memory leak since Operation do not close properly
> ----------------------------------------------------------------------------
>
> Key: SPARK-26751
> URL: https://issues.apache.org/jira/browse/SPARK-26751
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: zhoukang
> Assignee: zhoukang
> Priority: Major
> Attachments: 26751.png
>
>
> When we run in background and we get exception which is not HiveSQLException,
> we may encounter memory leak since handleToOperation will not removed correctly.
> The reason is below:
> 1. when calling operation.run we throw an exception which is not HiveSQLException
> 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
> {code:java}
> private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
> this.acquire(true);
> OperationManager operationManager = this.getOperationManager();
> ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
> OperationHandle opHandle = operation.getHandle();
> OperationHandle e;
> try {
> operation.run();
> this.opHandleSet.add(opHandle);
> e = opHandle;
> } catch (HiveSQLException var11) {
> operationManager.closeOperation(opHandle);
> throw var11;
> } finally {
> this.release(true);
> }
> return e;
> }
> try {
> // This submit blocks if no background threads are available to run this operation
> val backgroundHandle =
> parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
> setBackgroundHandle(backgroundHandle)
> } catch {
> case rejected: RejectedExecutionException =>
> setState(OperationState.ERROR)
> throw new HiveSQLException("The background threadpool cannot accept" +
> " new task for execution, please retry the operation", rejected)
> case NonFatal(e) =>
> logError(s"Error executing query in background", e)
> setState(OperationState.ERROR)
> throw e
> }
> }
> {code}
> 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
> {code}
> public void close() throws HiveSQLException {
> try {
> this.acquire(true);
> Iterator ioe = this.opHandleSet.iterator();
> while(ioe.hasNext()) {
> OperationHandle opHandle = (OperationHandle)ioe.next();
> this.operationManager.closeOperation(opHandle);
> }
> this.opHandleSet.clear();
> this.cleanupSessionLogDir();
> this.cleanupPipeoutFile();
> HiveHistory ioe1 = this.sessionState.getHiveHistory();
> if(null != ioe1) {
> ioe1.closeStream();
> }
> try {
> this.sessionState.close();
> } finally {
> this.sessionState = null;
> }
> } catch (IOException var17) {
> throw new HiveSQLException("Failure to close", var17);
> } finally {
> if(this.sessionState != null) {
> try {
> this.sessionState.close();
> } catch (Throwable var15) {
> LOG.warn("Error closing session", var15);
> }
> this.sessionState = null;
> }
> this.release(true);
> }
> }
> {code}
> 4. however, the opHandle will added into handleToOperation for each statement
> {code}
> val handleToOperation = ReflectionUtils
> .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
> val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
> val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
> override def newExecuteStatementOperation(
> parentSession: HiveSession,
> statement: String,
> confOverlay: JMap[String, String],
> async: Boolean): ExecuteStatementOperation = synchronized {
> val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
> require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
> s" initialized or had already closed.")
> val conf = sqlContext.sessionState.conf
> val hiveSessionState = parentSession.getSessionState
> setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
> setConfMap(conf, hiveSessionState.getHiveVariables)
> val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
> val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
> runInBackground)(sqlContext, sessionToActivePool)
> handleToOperation.put(operation.getHandle, operation)
> logDebug(s"Created Operation for $statement with session=$parentSession, " +
> s"runInBackground=$runInBackground")
> operation
> }
> {code}
> Below is an example which has memory leak:
> !26751.png!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org