You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2020/05/12 16:18:05 UTC

[spark] branch branch-3.0 updated: [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 512cb2f  [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener
512cb2f is described below

commit 512cb2f0246a0d020f0ba726b4596555b15797c6
Author: Ali Smesseim <al...@databricks.com>
AuthorDate: Tue May 12 09:14:34 2020 -0700

    [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener
    
    ### What changes were proposed in this pull request?
    
    The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log.
    
    Also, in HiveSessionImpl.close(), we catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed.
    
    ### Why are the changes needed?
    
    The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this hampers with the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException.
    
    In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #28155 from alismess-db/hive-thriftserver-listener-update-safer.
    
    Authored-by: Ali Smesseim <al...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
    (cherry picked from commit 6994c64efd5770a8fd33220cbcaddc1d96fed886)
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../ui/HiveThriftServer2Listener.scala             | 120 ++++++++++++---------
 .../hive/thriftserver/HiveSessionImplSuite.scala   |  73 +++++++++++++
 .../ui/HiveThriftServer2ListenerSuite.scala        |  16 +++
 .../hive/service/cli/session/HiveSessionImpl.java  |   6 +-
 .../hive/service/cli/session/HiveSessionImpl.java  |   6 +-
 5 files changed, 170 insertions(+), 51 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 6d0a506..20a8f2c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener(
     kvstore: ElementTrackingStore,
     sparkConf: SparkConf,
     server: Option[HiveServer2],
-    live: Boolean = true) extends SparkListener {
+    live: Boolean = true) extends SparkListener with Logging {
 
   private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
   private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
@@ -131,60 +132,81 @@ private[thriftserver] class HiveThriftServer2Listener(
     updateLiveStore(session)
   }
 
-  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
-    val session = sessionList.get(e.sessionId)
-    session.finishTimestamp = e.finishTime
-    updateStoreWithTriggerEnabled(session)
-    sessionList.remove(e.sessionId)
-  }
+  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
+    Option(sessionList.get(e.sessionId)) match {
+      case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
+      case Some(sessionData) =>
+        val session = sessionData
+        session.finishTimestamp = e.finishTime
+        updateStoreWithTriggerEnabled(session)
+        sessionList.remove(e.sessionId)
+    }
 
-  private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
-    val info = getOrCreateExecution(
-      e.id,
-      e.statement,
-      e.sessionId,
-      e.startTime,
-      e.userName)
-
-    info.state = ExecutionState.STARTED
-    executionList.put(e.id, info)
-    sessionList.get(e.sessionId).totalExecution += 1
-    executionList.get(e.id).groupId = e.groupId
-    updateLiveStore(executionList.get(e.id))
-    updateLiveStore(sessionList.get(e.sessionId))
-  }
+  private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit =
+    Option(sessionList.get(e.sessionId)) match {
+      case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}")
+      case Some(sessionData) =>
+        val info = getOrCreateExecution(
+          e.id,
+          e.statement,
+          e.sessionId,
+          e.startTime,
+          e.userName)
+
+        info.state = ExecutionState.STARTED
+        executionList.put(e.id, info)
+        sessionData.totalExecution += 1
+        executionList.get(e.id).groupId = e.groupId
+        updateLiveStore(executionList.get(e.id))
+        updateLiveStore(sessionData)
+    }
 
-  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
-    executionList.get(e.id).executePlan = e.executionPlan
-    executionList.get(e.id).state = ExecutionState.COMPILED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
+    Option(executionList.get(e.id)) match {
+      case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
+      case Some(executionData) =>
+        executionData.executePlan = e.executionPlan
+        executionData.state = ExecutionState.COMPILED
+        updateLiveStore(executionData)
+    }
 
-  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).state = ExecutionState.CANCELED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
+    Option(executionList.get(e.id)) match {
+      case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.state = ExecutionState.CANCELED
+        updateLiveStore(executionData)
+    }
 
-  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).detail = e.errorMsg
-    executionList.get(e.id).state = ExecutionState.FAILED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
+    Option(executionList.get(e.id)) match {
+      case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.detail = e.errorMsg
+        executionData.state = ExecutionState.FAILED
+        updateLiveStore(executionData)
+    }
 
-  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
-    executionList.get(e.id).finishTimestamp = e.finishTime
-    executionList.get(e.id).state = ExecutionState.FINISHED
-    updateLiveStore(executionList.get(e.id))
-  }
+  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
+    Option(executionList.get(e.id)) match {
+      case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
+      case Some(executionData) =>
+        executionData.finishTimestamp = e.finishTime
+        executionData.state = ExecutionState.FINISHED
+        updateLiveStore(executionData)
+    }
 
-  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
-    executionList.get(e.id).closeTimestamp = e.closeTime
-    executionList.get(e.id).state = ExecutionState.CLOSED
-    updateStoreWithTriggerEnabled(executionList.get(e.id))
-    executionList.remove(e.id)
-  }
+  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
+    Option(executionList.get(e.id)) match {
+      case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
+      case Some(executionData) =>
+        executionData.closeTimestamp = e.closeTime
+        executionData.state = ExecutionState.CLOSED
+        updateStoreWithTriggerEnabled(executionData)
+        executionList.remove(e.id)
+    }
 
   // Update both live and history stores. Trigger is enabled by default, hence
   // it will cleanup the entity which exceeds the threshold.
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
new file mode 100644
index 0000000..05d540d
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.service.cli.OperationHandle
+import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager}
+import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
+import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.invocation.InvocationOnMock
+
+import org.apache.spark.SparkFunSuite
+
+class HiveSessionImplSuite extends SparkFunSuite {
+  private var session: HiveSessionImpl = _
+  private var operationManager: OperationManager = _
+
+  override def beforeAll() {
+    super.beforeAll()
+
+    session = new HiveSessionImpl(
+      ThriftserverShimUtils.testedProtocolVersions.head,
+      "",
+      "",
+      new HiveConf(),
+      ""
+    )
+    val sessionManager = mock(classOf[SessionManager])
+    session.setSessionManager(sessionManager)
+    operationManager = mock(classOf[OperationManager])
+    session.setOperationManager(operationManager)
+    when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
+      (_: InvocationOnMock) => {
+        val operation = mock(classOf[GetCatalogsOperation])
+        when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
+        operation
+      }
+    )
+
+    session.open(Map.empty[String, String].asJava)
+  }
+
+  test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") {
+    val operationHandle1 = session.getCatalogs
+    val operationHandle2 = session.getCatalogs
+
+    when(operationManager.closeOperation(operationHandle1))
+      .thenThrow(classOf[NullPointerException])
+    when(operationManager.closeOperation(operationHandle2))
+      .thenThrow(classOf[NullPointerException])
+
+    session.close()
+
+    verify(operationManager).closeOperation(operationHandle1)
+    verify(operationManager).closeOperation(operationHandle2)
+  }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index 075032f..ea2523d 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,6 +140,22 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(listener.noLiveData())
   }
 
+  test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
+    val (statusStore: HiveThriftServer2AppStatusStore,
+    listener: HiveThriftServer2Listener) = createAppStatusStore(true)
+    val unknownSession = "unknown_session"
+    val unknownOperation = "unknown_operation"
+    listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession,
+      "stmt", "groupId", 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
+    listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
+      "msg", "trace", 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
+    listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0))
+  }
+
   private def createProperties: Properties = {
     val properties = new Properties()
     properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 745f385..3e2c3de 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,7 +636,11 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        operationManager.closeOperation(opHandle);
+        try {
+          operationManager.closeOperation(opHandle);
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing operation " + opHandle, e);
+        }
       }
       opHandleSet.clear();
       // Cleanup session log directory.
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 14e9c47..5cdae00 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,7 +650,11 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        operationManager.closeOperation(opHandle);
+        try {
+          operationManager.closeOperation(opHandle);
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing operation " + opHandle, e);
+        }
       }
       opHandleSet.clear();
       // Cleanup session log directory.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org