You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/14 19:11:13 UTC
[spark] branch branch-3.0 updated: Revert "[SPARK-31387] Handle
unknown operation/session ID in HiveThriftServer2Listener"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cf708f9 Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"
cf708f9 is described below
commit cf708f970b640722062b3102b8757a554aa0c841
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu May 14 12:06:13 2020 -0700
Revert "[SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener"
This reverts commit 512cb2f0246a0d020f0ba726b4596555b15797c6.
---
.../ui/HiveThriftServer2Listener.scala | 120 +++++++++------------
.../hive/thriftserver/HiveSessionImplSuite.scala | 73 -------------
.../ui/HiveThriftServer2ListenerSuite.scala | 16 ---
.../hive/service/cli/session/HiveSessionImpl.java | 6 +-
.../hive/service/cli/session/HiveSessionImpl.java | 6 +-
5 files changed, 51 insertions(+), 170 deletions(-)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 20a8f2c..6d0a506 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hive.service.server.HiveServer2
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.scheduler._
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -39,7 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener(
kvstore: ElementTrackingStore,
sparkConf: SparkConf,
server: Option[HiveServer2],
- live: Boolean = true) extends SparkListener with Logging {
+ live: Boolean = true) extends SparkListener {
private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
@@ -132,81 +131,60 @@ private[thriftserver] class HiveThriftServer2Listener(
updateLiveStore(session)
}
- private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
- Option(sessionList.get(e.sessionId)) match {
- case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
- case Some(sessionData) =>
- val session = sessionData
- session.finishTimestamp = e.finishTime
- updateStoreWithTriggerEnabled(session)
- sessionList.remove(e.sessionId)
- }
+ private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
+ val session = sessionList.get(e.sessionId)
+ session.finishTimestamp = e.finishTime
+ updateStoreWithTriggerEnabled(session)
+ sessionList.remove(e.sessionId)
+ }
- private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit =
- Option(sessionList.get(e.sessionId)) match {
- case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}")
- case Some(sessionData) =>
- val info = getOrCreateExecution(
- e.id,
- e.statement,
- e.sessionId,
- e.startTime,
- e.userName)
-
- info.state = ExecutionState.STARTED
- executionList.put(e.id, info)
- sessionData.totalExecution += 1
- executionList.get(e.id).groupId = e.groupId
- updateLiveStore(executionList.get(e.id))
- updateLiveStore(sessionData)
- }
+ private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
+ val info = getOrCreateExecution(
+ e.id,
+ e.statement,
+ e.sessionId,
+ e.startTime,
+ e.userName)
+
+ info.state = ExecutionState.STARTED
+ executionList.put(e.id, info)
+ sessionList.get(e.sessionId).totalExecution += 1
+ executionList.get(e.id).groupId = e.groupId
+ updateLiveStore(executionList.get(e.id))
+ updateLiveStore(sessionList.get(e.sessionId))
+ }
- private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
- Option(executionList.get(e.id)) match {
- case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
- case Some(executionData) =>
- executionData.executePlan = e.executionPlan
- executionData.state = ExecutionState.COMPILED
- updateLiveStore(executionData)
- }
+ private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
+ executionList.get(e.id).executePlan = e.executionPlan
+ executionList.get(e.id).state = ExecutionState.COMPILED
+ updateLiveStore(executionList.get(e.id))
+ }
- private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
- Option(executionList.get(e.id)) match {
- case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
- case Some(executionData) =>
- executionData.finishTimestamp = e.finishTime
- executionData.state = ExecutionState.CANCELED
- updateLiveStore(executionData)
- }
+ private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).state = ExecutionState.CANCELED
+ updateLiveStore(executionList.get(e.id))
+ }
- private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
- Option(executionList.get(e.id)) match {
- case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
- case Some(executionData) =>
- executionData.finishTimestamp = e.finishTime
- executionData.detail = e.errorMsg
- executionData.state = ExecutionState.FAILED
- updateLiveStore(executionData)
- }
+ private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).detail = e.errorMsg
+ executionList.get(e.id).state = ExecutionState.FAILED
+ updateLiveStore(executionList.get(e.id))
+ }
- private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
- Option(executionList.get(e.id)) match {
- case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
- case Some(executionData) =>
- executionData.finishTimestamp = e.finishTime
- executionData.state = ExecutionState.FINISHED
- updateLiveStore(executionData)
- }
+ private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).state = ExecutionState.FINISHED
+ updateLiveStore(executionList.get(e.id))
+ }
- private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
- Option(executionList.get(e.id)) match {
- case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
- case Some(executionData) =>
- executionData.closeTimestamp = e.closeTime
- executionData.state = ExecutionState.CLOSED
- updateStoreWithTriggerEnabled(executionData)
- executionList.remove(e.id)
- }
+ private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
+ executionList.get(e.id).closeTimestamp = e.closeTime
+ executionList.get(e.id).state = ExecutionState.CLOSED
+ updateStoreWithTriggerEnabled(executionList.get(e.id))
+ executionList.remove(e.id)
+ }
// Update both live and history stores. Trigger is enabled by default, hence
// it will cleanup the entity which exceeds the threshold.
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
deleted file mode 100644
index 05d540d..0000000
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.thriftserver
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hive.service.cli.OperationHandle
-import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager}
-import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
-import org.mockito.Mockito.{mock, verify, when}
-import org.mockito.invocation.InvocationOnMock
-
-import org.apache.spark.SparkFunSuite
-
-class HiveSessionImplSuite extends SparkFunSuite {
- private var session: HiveSessionImpl = _
- private var operationManager: OperationManager = _
-
- override def beforeAll() {
- super.beforeAll()
-
- session = new HiveSessionImpl(
- ThriftserverShimUtils.testedProtocolVersions.head,
- "",
- "",
- new HiveConf(),
- ""
- )
- val sessionManager = mock(classOf[SessionManager])
- session.setSessionManager(sessionManager)
- operationManager = mock(classOf[OperationManager])
- session.setOperationManager(operationManager)
- when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
- (_: InvocationOnMock) => {
- val operation = mock(classOf[GetCatalogsOperation])
- when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
- operation
- }
- )
-
- session.open(Map.empty[String, String].asJava)
- }
-
- test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") {
- val operationHandle1 = session.getCatalogs
- val operationHandle2 = session.getCatalogs
-
- when(operationManager.closeOperation(operationHandle1))
- .thenThrow(classOf[NullPointerException])
- when(operationManager.closeOperation(operationHandle2))
- .thenThrow(classOf[NullPointerException])
-
- session.close()
-
- verify(operationManager).closeOperation(operationHandle1)
- verify(operationManager).closeOperation(operationHandle2)
- }
-}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index ea2523d..075032f 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,22 +140,6 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(listener.noLiveData())
}
- test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
- val (statusStore: HiveThriftServer2AppStatusStore,
- listener: HiveThriftServer2Listener) = createAppStatusStore(true)
- val unknownSession = "unknown_session"
- val unknownOperation = "unknown_operation"
- listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
- listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession,
- "stmt", "groupId", 0))
- listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
- listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
- listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
- "msg", "trace", 0))
- listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
- listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0))
- }
-
private def createProperties: Properties = {
val properties = new Properties()
properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 3e2c3de..745f385 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,11 +636,7 @@ public class HiveSessionImpl implements HiveSession {
acquire(true);
// Iterate through the opHandles and close their operations
for (OperationHandle opHandle : opHandleSet) {
- try {
- operationManager.closeOperation(opHandle);
- } catch (Exception e) {
- LOG.warn("Exception is thrown closing operation " + opHandle, e);
- }
+ operationManager.closeOperation(opHandle);
}
opHandleSet.clear();
// Cleanup session log directory.
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 5cdae00..14e9c47 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,11 +650,7 @@ public class HiveSessionImpl implements HiveSession {
acquire(true);
// Iterate through the opHandles and close their operations
for (OperationHandle opHandle : opHandleSet) {
- try {
- operationManager.closeOperation(opHandle);
- } catch (Exception e) {
- LOG.warn("Exception is thrown closing operation " + opHandle, e);
- }
+ operationManager.closeOperation(opHandle);
}
opHandleSet.clear();
// Cleanup session log directory.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org