You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/14 19:11:13 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cf708f9  Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"
cf708f9 is described below

commit cf708f970b640722062b3102b8757a554aa0c841
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu May 14 12:06:13 2020 -0700

    Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"
    
    This reverts commit 512cb2f0246a0d020f0ba726b4596555b15797c6.
---
 .../ui/HiveThriftServer2Listener.scala             | 120 +++++++++------------
 .../hive/thriftserver/HiveSessionImplSuite.scala   |  73 -------------
 .../ui/HiveThriftServer2ListenerSuite.scala        |  16 ---
 .../hive/service/cli/session/HiveSessionImpl.java  |   6 +-
 .../hive/service/cli/session/HiveSessionImpl.java  |   6 +-
 5 files changed, 51 insertions(+), 170 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 20a8f2c..6d0a506 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -39,7 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener(
     kvstore: ElementTrackingStore,
     sparkConf: SparkConf,
     server: Option[HiveServer2],
-    live: Boolean = true) extends SparkListener with Logging {
+    live: Boolean = true) extends SparkListener {
 
   private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
   private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
@@ -132,81 +131,60 @@ private[thriftserver] class HiveThriftServer2Listener(
     updateLiveStore(session)
   }
 
-  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
-    Option(sessionList.get(e.sessionId)) match {
-      case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
-      case Some(sessionData) =>
-        val session = sessionData
-        session.finishTimestamp = e.finishTime
-        updateStoreWithTriggerEnabled(session)
-        sessionList.remove(e.sessionId)
-    }
+  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
+    val session = sessionList.get(e.sessionId)
+    session.finishTimestamp = e.finishTime
+    updateStoreWithTriggerEnabled(session)
+    sessionList.remove(e.sessionId)
+  }
 
-  private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit =
-    Option(sessionList.get(e.sessionId)) match {
-      case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}")
-      case Some(sessionData) =>
-        val info = getOrCreateExecution(
-          e.id,
-          e.statement,
-          e.sessionId,
-          e.startTime,
-          e.userName)
-
-        info.state = ExecutionState.STARTED
-        executionList.put(e.id, info)
-        sessionData.totalExecution += 1
-        executionList.get(e.id).groupId = e.groupId
-        updateLiveStore(executionList.get(e.id))
-        updateLiveStore(sessionData)
-    }
+  private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
+    val info = getOrCreateExecution(
+      e.id,
+      e.statement,
+      e.sessionId,
+      e.startTime,
+      e.userName)
+
+    info.state = ExecutionState.STARTED
+    executionList.put(e.id, info)
+    sessionList.get(e.sessionId).totalExecution += 1
+    executionList.get(e.id).groupId = e.groupId
+    updateLiveStore(executionList.get(e.id))
+    updateLiveStore(sessionList.get(e.sessionId))
+  }
 
-  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
-    Option(executionList.get(e.id)) match {
-      case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
-      case Some(executionData) =>
-        executionData.executePlan = e.executionPlan
-        executionData.state = ExecutionState.COMPILED
-        updateLiveStore(executionData)
-    }
+  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
+    executionList.get(e.id).executePlan = e.executionPlan
+    executionList.get(e.id).state = ExecutionState.COMPILED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
-    Option(executionList.get(e.id)) match {
-      case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.state = ExecutionState.CANCELED
-        updateLiveStore(executionData)
-    }
+  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).state = ExecutionState.CANCELED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
-    Option(executionList.get(e.id)) match {
-      case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.detail = e.errorMsg
-        executionData.state = ExecutionState.FAILED
-        updateLiveStore(executionData)
-    }
+  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).detail = e.errorMsg
+    executionList.get(e.id).state = ExecutionState.FAILED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
-    Option(executionList.get(e.id)) match {
-      case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.state = ExecutionState.FINISHED
-        updateLiveStore(executionData)
-    }
+  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).state = ExecutionState.FINISHED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
-    Option(executionList.get(e.id)) match {
-      case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
-      case Some(executionData) =>
-        executionData.closeTimestamp = e.closeTime
-        executionData.state = ExecutionState.CLOSED
-        updateStoreWithTriggerEnabled(executionData)
-        executionList.remove(e.id)
-    }
+  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
+    executionList.get(e.id).closeTimestamp = e.closeTime
+    executionList.get(e.id).state = ExecutionState.CLOSED
+    updateStoreWithTriggerEnabled(executionList.get(e.id))
+    executionList.remove(e.id)
+  }
 
   // Update both live and history stores. Trigger is enabled by default, hence
   // it will cleanup the entity which exceeds the threshold.
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
deleted file mode 100644
index 05d540d..0000000
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.thriftserver
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hive.service.cli.OperationHandle
-import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager}
-import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
-import org.mockito.Mockito.{mock, verify, when}
-import org.mockito.invocation.InvocationOnMock
-
-import org.apache.spark.SparkFunSuite
-
-class HiveSessionImplSuite extends SparkFunSuite {
-  private var session: HiveSessionImpl = _
-  private var operationManager: OperationManager = _
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    session = new HiveSessionImpl(
-      ThriftserverShimUtils.testedProtocolVersions.head,
-      "",
-      "",
-      new HiveConf(),
-      ""
-    )
-    val sessionManager = mock(classOf[SessionManager])
-    session.setSessionManager(sessionManager)
-    operationManager = mock(classOf[OperationManager])
-    session.setOperationManager(operationManager)
-    when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
-      (_: InvocationOnMock) => {
-        val operation = mock(classOf[GetCatalogsOperation])
-        when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
-        operation
-      }
-    )
-
-    session.open(Map.empty[String, String].asJava)
-  }
-
-  test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") {
-    val operationHandle1 = session.getCatalogs
-    val operationHandle2 = session.getCatalogs
-
-    when(operationManager.closeOperation(operationHandle1))
-      .thenThrow(classOf[NullPointerException])
-    when(operationManager.closeOperation(operationHandle2))
-      .thenThrow(classOf[NullPointerException])
-
-    session.close()
-
-    verify(operationManager).closeOperation(operationHandle1)
-    verify(operationManager).closeOperation(operationHandle2)
-  }
-}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index ea2523d..075032f 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,22 +140,6 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(listener.noLiveData())
   }
 
-  test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
-    val (statusStore: HiveThriftServer2AppStatusStore,
-    listener: HiveThriftServer2Listener) = createAppStatusStore(true)
-    val unknownSession = "unknown_session"
-    val unknownOperation = "unknown_operation"
-    listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession,
-      "stmt", "groupId", 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
-    listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
-      "msg", "trace", 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0))
-  }
-
   private def createProperties: Properties = {
     val properties = new Properties()
     properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 3e2c3de..745f385 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,11 +636,7 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        try {
-          operationManager.closeOperation(opHandle);
-        } catch (Exception e) {
-          LOG.warn("Exception is thrown closing operation " + opHandle, e);
-        }
+        operationManager.closeOperation(opHandle);
       }
       opHandleSet.clear();
       // Cleanup session log directory.
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 5cdae00..14e9c47 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,11 +650,7 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        try {
-          operationManager.closeOperation(opHandle);
-        } catch (Exception e) {
-          LOG.warn("Exception is thrown closing operation " + opHandle, e);
-        }
+        operationManager.closeOperation(opHandle);
       }
       opHandleSet.clear();
       // Cleanup session log directory.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org