You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/20 17:34:28 UTC

[spark] branch branch-3.0 updated: [SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5198b68  [SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener
5198b68 is described below

commit 5198b6853b2e3bc69fc013c653aa163c79168366
Author: Ali Smesseim <al...@databricks.com>
AuthorDate: Wed May 20 10:30:17 2020 -0700

    [SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener
    
    ### What changes were proposed in this pull request?
    
    This is a recreation of #28155, which was reverted due to causing test failures.
    
    The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log.
    
    To improve robustness, we also make the following changes in HiveSessionImpl.close():
    
    - Catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed.
    - Handle not being able to access the scratch directory. When closing, all `.pipeout` files are removed from the scratch directory, which would have resulted in an NPE if the directory does not exist.
    
    ### Why are the changes needed?
    
    The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this changes the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException.
    
    In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #28544 from alismess-db/hive-thriftserver-listener-update-safer-2.
    
    Authored-by: Ali Smesseim <al...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit d40ecfa3f7b0102063075cafd54451f082ce38e1)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../ui/HiveThriftServer2Listener.scala             | 106 +++++++++++++--------
 .../hive/thriftserver/HiveSessionImplSuite.scala   |  73 ++++++++++++++
 .../ui/HiveThriftServer2ListenerSuite.scala        |  17 ++++
 .../hive/service/cli/session/HiveSessionImpl.java  |  20 ++--
 .../hive/service/cli/session/HiveSessionImpl.java  |  20 ++--
 5 files changed, 183 insertions(+), 53 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 6d0a506..6b7e5ee 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener(
     kvstore: ElementTrackingStore,
     sparkConf: SparkConf,
     server: Option[HiveServer2],
-    live: Boolean = true) extends SparkListener {
+    live: Boolean = true) extends SparkListener with Logging {
 
   private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
   private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
@@ -131,60 +132,83 @@ private[thriftserver] class HiveThriftServer2Listener(
     updateLiveStore(session)
   }
 
-  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
-    val session = sessionList.get(e.sessionId)
-    session.finishTimestamp = e.finishTime
-    updateStoreWithTriggerEnabled(session)
-    sessionList.remove(e.sessionId)
-  }
+  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
+    Option(sessionList.get(e.sessionId)) match {
+      case Some(sessionData) =>
+        sessionData.finishTimestamp = e.finishTime
+        updateStoreWithTriggerEnabled(sessionData)
+        sessionList.remove(e.sessionId)
+      case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
+    }
 
   private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
-    val info = getOrCreateExecution(
+    val executionData = getOrCreateExecution(
       e.id,
       e.statement,
       e.sessionId,
       e.startTime,
       e.userName)
 
-    info.state = ExecutionState.STARTED
-    executionList.put(e.id, info)
-    sessionList.get(e.sessionId).totalExecution += 1
-    executionList.get(e.id).groupId = e.groupId
-    updateLiveStore(executionList.get(e.id))
-    updateLiveStore(sessionList.get(e.sessionId))
+    executionData.state = ExecutionState.STARTED
+    executionList.put(e.id, executionData)
+    executionData.groupId = e.groupId
+    updateLiveStore(executionData)
+
+    Option(sessionList.get(e.sessionId)) match {
+      case Some(sessionData) =>
+        sessionData.totalExecution += 1
+        updateLiveStore(sessionData)
+      case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." +
+        s"Regardless, the operation has been registered.")
+    }
   }
 
-  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
-    executionList.get(e.id).executePlan = e.executionPlan
-    executionList.get(e.id).state = ExecutionState.COMPILED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
+    Option(executionList.get(e.id)) match {
+      case Some(executionData) =>
+        executionData.executePlan = e.executionPlan
+        executionData.state = ExecutionState.COMPILED
+        updateLiveStore(executionData)
+      case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
+    }
 
-  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).state = ExecutionState.CANCELED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
+    Option(executionList.get(e.id)) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.state = ExecutionState.CANCELED
+        updateLiveStore(executionData)
+      case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
+    }
 
-  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).detail = e.errorMsg
-    executionList.get(e.id).state = ExecutionState.FAILED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
+    Option(executionList.get(e.id)) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.detail = e.errorMsg
+        executionData.state = ExecutionState.FAILED
+        updateLiveStore(executionData)
+      case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
+    }
 
-  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).state = ExecutionState.FINISHED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
+    Option(executionList.get(e.id)) match {
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.state = ExecutionState.FINISHED
+        updateLiveStore(executionData)
+      case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
+    }
 
-  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
-    executionList.get(e.id).closeTimestamp = e.closeTime
-    executionList.get(e.id).state = ExecutionState.CLOSED
-    updateStoreWithTriggerEnabled(executionList.get(e.id))
-    executionList.remove(e.id)
-  }
+  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
+    Option(executionList.get(e.id)) match {
+      case Some(executionData) =>
+        executionData.closeTimestamp = e.closeTime
+        executionData.state = ExecutionState.CLOSED
+        updateStoreWithTriggerEnabled(executionData)
+        executionList.remove(e.id)
+      case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
+    }
 
   // Update both live and history stores. Trigger is enabled by default, hence
   // it will cleanup the entity which exceeds the threshold.
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
new file mode 100644
index 0000000..05d540d
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.service.cli.OperationHandle
+import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager}
+import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
+import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.invocation.InvocationOnMock
+
+import org.apache.spark.SparkFunSuite
+
+class HiveSessionImplSuite extends SparkFunSuite {
+  private var session: HiveSessionImpl = _
+  private var operationManager: OperationManager = _
+
+  override def beforeAll() {
+    super.beforeAll()
+
+    session = new HiveSessionImpl(
+      ThriftserverShimUtils.testedProtocolVersions.head,
+      "",
+      "",
+      new HiveConf(),
+      ""
+    )
+    val sessionManager = mock(classOf[SessionManager])
+    session.setSessionManager(sessionManager)
+    operationManager = mock(classOf[OperationManager])
+    session.setOperationManager(operationManager)
+    when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
+      (_: InvocationOnMock) => {
+        val operation = mock(classOf[GetCatalogsOperation])
+        when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
+        operation
+      }
+    )
+
+    session.open(Map.empty[String, String].asJava)
+  }
+
+  test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") {
+    val operationHandle1 = session.getCatalogs
+    val operationHandle2 = session.getCatalogs
+
+    when(operationManager.closeOperation(operationHandle1))
+      .thenThrow(classOf[NullPointerException])
+    when(operationManager.closeOperation(operationHandle2))
+      .thenThrow(classOf[NullPointerException])
+
+    session.close()
+
+    verify(operationManager).closeOperation(operationHandle1)
+    verify(operationManager).closeOperation(operationHandle2)
+  }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index 075032f..9a9f574 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,6 +140,23 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(listener.noLiveData())
   }
 
+  test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
+    val (statusStore: HiveThriftServer2AppStatusStore, listener: HiveThriftServer2Listener) =
+      createAppStatusStore(true)
+
+    val unknownSession = "unknown_session"
+    val unknownOperation = "unknown_operation"
+    listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession,
+      "stmt", "groupId", 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
+    listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
+      "msg", "trace", 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0))
+  }
+
   private def createProperties: Properties = {
     val properties = new Properties()
     properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 745f385..e3fb54d 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,7 +636,11 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        operationManager.closeOperation(opHandle);
+        try {
+          operationManager.closeOperation(opHandle);
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing operation " + opHandle, e);
+        }
       }
       opHandleSet.clear();
       // Cleanup session log directory.
@@ -674,11 +678,15 @@ public class HiveSessionImpl implements HiveSession {
     File[] fileAry = new File(lScratchDir).listFiles(
             (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
 
-    for (File file : fileAry) {
-      try {
-        FileUtils.forceDelete(file);
-      } catch (Exception e) {
-        LOG.error("Failed to cleanup pipeout file: " + file, e);
+    if (fileAry == null) {
+      LOG.error("Unable to access pipeout files in " + lScratchDir);
+    } else {
+      for (File file : fileAry) {
+        try {
+          FileUtils.forceDelete(file);
+        } catch (Exception e) {
+          LOG.error("Failed to cleanup pipeout file: " + file, e);
+        }
       }
     }
   }
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 14e9c47..1b3e8fe 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,7 +650,11 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        operationManager.closeOperation(opHandle);
+        try {
+          operationManager.closeOperation(opHandle);
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing operation " + opHandle, e);
+        }
       }
       opHandleSet.clear();
       // Cleanup session log directory.
@@ -688,11 +692,15 @@ public class HiveSessionImpl implements HiveSession {
     File[] fileAry = new File(lScratchDir).listFiles(
             (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
 
-    for (File file : fileAry) {
-      try {
-        FileUtils.forceDelete(file);
-      } catch (Exception e) {
-        LOG.error("Failed to cleanup pipeout file: " + file, e);
+    if (fileAry == null) {
+      LOG.error("Unable to access pipeout files in " + lScratchDir);
+    } else {
+      for (File file : fileAry) {
+        try {
+          FileUtils.forceDelete(file);
+        } catch (Exception e) {
+          LOG.error("Failed to cleanup pipeout file: " + file, e);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org