You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhoukang (JIRA)" <ji...@apache.org> on 2019/01/28 11:35:00 UTC

[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly

     [ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhoukang updated SPARK-26751:
-----------------------------
    Description: 
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not HiveSQLException
2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
        this.acquire(true);
        OperationManager operationManager = this.getOperationManager();
        ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
        OperationHandle opHandle = operation.getHandle();

        OperationHandle e;
        try {
            operation.run();
            this.opHandleSet.add(opHandle);
            e = opHandle;
        } catch (HiveSQLException var11) {
            operationManager.closeOperation(opHandle);
            throw var11;
        } finally {
            this.release(true);
        }

        return e;
    }


      try {
        // This submit blocks if no background threads are available to run this operation
        val backgroundHandle =
          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
{code}
3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
{code}
public void close() throws HiveSQLException {
        try {
            this.acquire(true);
            Iterator ioe = this.opHandleSet.iterator();

            while(ioe.hasNext()) {
                OperationHandle opHandle = (OperationHandle)ioe.next();
                this.operationManager.closeOperation(opHandle);
            }

            this.opHandleSet.clear();
            this.cleanupSessionLogDir();
            this.cleanupPipeoutFile();
            HiveHistory ioe1 = this.sessionState.getHiveHistory();
            if(null != ioe1) {
                ioe1.closeStream();
            }

            try {
                this.sessionState.close();
            } finally {
                this.sessionState = null;
            }
        } catch (IOException var17) {
            throw new HiveSQLException("Failure to close", var17);
        } finally {
            if(this.sessionState != null) {
                try {
                    this.sessionState.close();
                } catch (Throwable var15) {
                    LOG.warn("Error closing session", var15);
                }

                this.sessionState = null;
            }

            this.release(true);
        }

    }
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {
    val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
    require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
      s" initialized or had already closed.")
    val conf = sqlContext.sessionState.conf
    val hiveSessionState = parentSession.getSessionState
    setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
    setConfMap(conf, hiveSessionState.getHiveVariables)
    val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
      runInBackground)(sqlContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }
{code}

Below is an example which has memory leak:


  was:
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not HiveSQLException
2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
        this.acquire(true);
        OperationManager operationManager = this.getOperationManager();
        ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
        OperationHandle opHandle = operation.getHandle();

        OperationHandle e;
        try {
            operation.run();
            this.opHandleSet.add(opHandle);
            e = opHandle;
        } catch (HiveSQLException var11) {
            operationManager.closeOperation(opHandle);
            throw var11;
        } finally {
            this.release(true);
        }

        return e;
    }


      try {
        // This submit blocks if no background threads are available to run this operation
        val backgroundHandle =
          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
{code}
3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
{code}
public void close() throws HiveSQLException {
        try {
            this.acquire(true);
            Iterator ioe = this.opHandleSet.iterator();

            while(ioe.hasNext()) {
                OperationHandle opHandle = (OperationHandle)ioe.next();
                this.operationManager.closeOperation(opHandle);
            }

            this.opHandleSet.clear();
            this.cleanupSessionLogDir();
            this.cleanupPipeoutFile();
            HiveHistory ioe1 = this.sessionState.getHiveHistory();
            if(null != ioe1) {
                ioe1.closeStream();
            }

            try {
                this.sessionState.close();
            } finally {
                this.sessionState = null;
            }
        } catch (IOException var17) {
            throw new HiveSQLException("Failure to close", var17);
        } finally {
            if(this.sessionState != null) {
                try {
                    this.sessionState.close();
                } catch (Throwable var15) {
                    LOG.warn("Error closing session", var15);
                }

                this.sessionState = null;
            }

            this.release(true);
        }

    }
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {
    val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
    require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
      s" initialized or had already closed.")
    val conf = sqlContext.sessionState.conf
    val hiveSessionState = parentSession.getSessionState
    setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
    setConfMap(conf, hiveSessionState.getHiveVariables)
    val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
      runInBackground)(sqlContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }
{code}




> HiveSessionImpl might have memory leak since Operation do not close properly
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-26751
>                 URL: https://issues.apache.org/jira/browse/SPARK-26751
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: zhoukang
>            Priority: Major
>
> When we run in background and we get exception which is not HiveSQLException,
> we may encounter memory leak since handleToOperation will not removed correctly.
> The reason is below:
> 1. when calling operation.run we throw an exception which is not HiveSQLException
> 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
> {code:java}
>  private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
>         this.acquire(true);
>         OperationManager operationManager = this.getOperationManager();
>         ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
>         OperationHandle opHandle = operation.getHandle();
>         OperationHandle e;
>         try {
>             operation.run();
>             this.opHandleSet.add(opHandle);
>             e = opHandle;
>         } catch (HiveSQLException var11) {
>             operationManager.closeOperation(opHandle);
>             throw var11;
>         } finally {
>             this.release(true);
>         }
>         return e;
>     }
>       try {
>         // This submit blocks if no background threads are available to run this operation
>         val backgroundHandle =
>           parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
>         setBackgroundHandle(backgroundHandle)
>       } catch {
>         case rejected: RejectedExecutionException =>
>           setState(OperationState.ERROR)
>           throw new HiveSQLException("The background threadpool cannot accept" +
>             " new task for execution, please retry the operation", rejected)
>         case NonFatal(e) =>
>           logError(s"Error executing query in background", e)
>           setState(OperationState.ERROR)
>           throw e
>       }
>     }
> {code}
> 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
> {code}
> public void close() throws HiveSQLException {
>         try {
>             this.acquire(true);
>             Iterator ioe = this.opHandleSet.iterator();
>             while(ioe.hasNext()) {
>                 OperationHandle opHandle = (OperationHandle)ioe.next();
>                 this.operationManager.closeOperation(opHandle);
>             }
>             this.opHandleSet.clear();
>             this.cleanupSessionLogDir();
>             this.cleanupPipeoutFile();
>             HiveHistory ioe1 = this.sessionState.getHiveHistory();
>             if(null != ioe1) {
>                 ioe1.closeStream();
>             }
>             try {
>                 this.sessionState.close();
>             } finally {
>                 this.sessionState = null;
>             }
>         } catch (IOException var17) {
>             throw new HiveSQLException("Failure to close", var17);
>         } finally {
>             if(this.sessionState != null) {
>                 try {
>                     this.sessionState.close();
>                 } catch (Throwable var15) {
>                     LOG.warn("Error closing session", var15);
>                 }
>                 this.sessionState = null;
>             }
>             this.release(true);
>         }
>     }
> {code}
> 4. however, the opHandle will added into handleToOperation for each statement
> {code}
> val handleToOperation = ReflectionUtils
>     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
>   val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
>   val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
>   override def newExecuteStatementOperation(
>       parentSession: HiveSession,
>       statement: String,
>       confOverlay: JMap[String, String],
>       async: Boolean): ExecuteStatementOperation = synchronized {
>     val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
>     require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
>       s" initialized or had already closed.")
>     val conf = sqlContext.sessionState.conf
>     val hiveSessionState = parentSession.getSessionState
>     setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
>     setConfMap(conf, hiveSessionState.getHiveVariables)
>     val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
>     val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
>       runInBackground)(sqlContext, sessionToActivePool)
>     handleToOperation.put(operation.getHandle, operation)
>     logDebug(s"Created Operation for $statement with session=$parentSession, " +
>       s"runInBackground=$runInBackground")
>     operation
>   }
> {code}
> Below is an example which has memory leak:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org