You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/07/14 11:26:00 UTC
[incubator-kyuubi] branch master updated: Event Tracking: For
statement (#767)
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ff52b20 Event Tracking: For statement (#767)
ff52b20 is described below
commit ff52b203e0226b70afb9762b12147b86cfb3285a
Author: Yuxiang Zhang <zh...@126.com>
AuthorDate: Wed Jul 14 19:25:51 2021 +0800
Event Tracking: For statement (#767)
Generate KStatement info. This object includes the following elements:
1. statement
2. statementId
3. appId
4. sessionId
5. executionId
6. physicalPlan
7. stateTime: contains each state and the time occurrence
Those data was packaged in KStatement and we store those object in mem.
You can get some summary data from this object.
---
externals/kyuubi-spark-monitor/pom.xml | 20 +++++
.../spark/monitor/KyuubiStatementMonitor.scala | 66 +++++++++++++++++
.../spark/monitor/entity/KyuubiStatementInfo.scala | 48 ++++++++++++
.../src/test/resources/log4j.properties | 40 ++++++++++
externals/kyuubi-spark-sql-engine/pom.xml | 2 +-
.../engine/spark/operation/ExecuteStatement.scala | 20 +++++
.../engine/spark/KyuubiStatementMonitorSuite.scala | 86 ++++++++++++++++++++++
7 files changed, 281 insertions(+), 1 deletion(-)
diff --git a/externals/kyuubi-spark-monitor/pom.xml b/externals/kyuubi-spark-monitor/pom.xml
index 92e0283..8c60a52 100644
--- a/externals/kyuubi-spark-monitor/pom.xml
+++ b/externals/kyuubi-spark-monitor/pom.xml
@@ -31,6 +31,26 @@
<packaging>jar</packaging>
<name>Kyuubi Project Spark Monitor</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
new file mode 100644
index 0000000..f33fbe9
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.monitor
+
+import java.util.concurrent.ArrayBlockingQueue
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
+
+// TODO: Thread Safe need to consider
+object KyuubiStatementMonitor extends Logging{
+
+ /**
+ * This blockingQueue store kyuubiStatementInfo.
+ *
+ * Notice:
+ * 1. When we remove items from this queue, we should ensure those statements have finished.
+ * If not, we should put them into this queue again.
+ * 2. There have two kinds of threshold to trigger when to remove items from this queue:
+ * a. time
+ * b. this queue's current size
+ */
+ // TODO: Capacity should make configurable
+ private val kyuubiStatementQueue = new ArrayBlockingQueue[KyuubiStatementInfo](10)
+
+ /**
+ * This function is used for putting kyuubiStatementInfo into blockingQueue(statementQueue).
+ * Every time we put an item into this queue, we should judge this queue's current size at first.
+ * If the size is less than threshold, we need to remove items from this queue.
+ * @param kyuubiStatementInfo
+ */
+ // TODO: Lack size type threshold and time type threshold
+ def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = {
+ if (kyuubiStatementQueue.size() >= 7) {
+ removeAndDumpStatementInfoFromQueue()
+ }
+ val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo)
+ info(s"Add kyuubiStatementInfo into queue is [$isSuccess], " +
+ s"statementId is [${kyuubiStatementInfo.statementId}]")
+ }
+
+ /**
+ * This function is used for removing kyuubiStatementInfo from blockingQueue(statementQueue)
+ * and dumpping them to a file by threshold.
+ */
+ // TODO: Need ensure those items have finished. If not, we should put them into this queue again.
+ private def removeAndDumpStatementInfoFromQueue(): Unit = {
+ // TODO: Just for test
+ kyuubiStatementQueue.clear()
+ }
+}
diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala
new file mode 100644
index 0000000..2b61cee
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.monitor.entity
+
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.cli.HandleIdentifier
+import org.apache.kyuubi.operation.OperationState.OperationState
+
+/**
+ * This object store the summary infomation about statement.
+ * You can use statementId to get all jobs' or stages' metric that this statement has.
+ * @param statementId
+ * @param statement
+ * @param appId
+ * @param sessionId
+ * @param queryExecution: contains physicalPlan, logicPlan and so on
+ * @param exception
+ * @param stateToTime: store this statement's every state and the time of occurrence
+ */
+case class KyuubiStatementInfo(
+ statementId: String,
+ statement: String,
+ appId: String,
+ sessionId: HandleIdentifier,
+ stateToTime: Map[OperationState, Long]) {
+
+ var queryExecution: QueryExecution = null
+ var exception: KyuubiSQLException = null
+}
diff --git a/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fee75dc
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = FATAL
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = DEBUG
+
+# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
+log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
+log4j.appender.console.filter.1.AcceptOnMatch=false
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index 3bf12ee..f9b0e77 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -49,7 +49,7 @@
<artifactId>kyuubi-spark-monitor</artifactId>
<version>${project.version}</version>
</dependency>
-
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 27b897a..76bf7c9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
+import scala.collection.mutable.Map
+
import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
@@ -26,6 +28,8 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
+import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@@ -58,6 +62,11 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
+ private val kyuubiStatementInfo = KyuubiStatementInfo(
+ statementId, statement, spark.sparkContext.applicationId,
+ session.getTypeInfo.identifier, Map(state -> lastAccessTime))
+ KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo)
+
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
@@ -84,6 +93,7 @@ class ExecuteStatement(
// TODO: Make it configurable
spark.sparkContext.addSparkListener(operationListener)
result = spark.sql(statement)
+ kyuubiStatementInfo.queryExecution = result.queryExecution
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
@@ -156,6 +166,16 @@ class ExecuteStatement(
spark.sparkContext.removeSparkListener(operationListener)
super.cleanup(targetState)
}
+
+ override def setState(newState: OperationState): Unit = {
+ super.setState(newState)
+ kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime)
+ }
+
+ override def setOperationException(opEx: KyuubiSQLException): Unit = {
+ super.setOperationException(opEx)
+ kyuubiStatementInfo.exception = opEx
+ }
}
object ExecuteStatement {
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
new file mode 100644
index 0000000..f54ac52
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.concurrent.ArrayBlockingQueue
+
+import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationHandle}
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface
+import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.scalatest.PrivateMethodTester
+
+import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
+import org.apache.kyuubi.operation.HiveJDBCTests
+
+class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
+ with PrivateMethodTester {
+
+ override protected def jdbcUrl: String = getJdbcUrl
+ override def withKyuubiConf: Map[String, String] = Map.empty
+
+ test("add kyuubiStatementInfo into queue and remove them by size type threshold") {
+ val sql = "select timestamp'2021-06-01'"
+ val total: Int = 7
+ // Clear kyuubiStatementQueue first
+ val getQueue = PrivateMethod[
+ ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))()
+ val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue)
+ kyuubiStatementQueue.clear()
+ withSessionHandle { (client, handle) =>
+ for ( a <- 1 to total ) {
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val operationHandle = tExecuteStatementResp.getOperationHandle
+ waitForOperationToComplete(client, operationHandle)
+ }
+
+ var iterator = kyuubiStatementQueue.iterator()
+ while (iterator.hasNext) {
+ val kyuubiStatementInfo = iterator.next()
+ assert(kyuubiStatementInfo.statement !== null)
+ assert(kyuubiStatementInfo.statementId !== null)
+ assert(kyuubiStatementInfo.sessionId !== null)
+ assert(kyuubiStatementInfo.queryExecution !== null)
+ assert(kyuubiStatementInfo.stateToTime.size === 4)
+ }
+ iterator = null
+
+ // Test for clear kyuubiStatementQueue
+ // This function is used for avoiding mem leak
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val operationHandle = tExecuteStatementResp.getOperationHandle
+ waitForOperationToComplete(client, operationHandle)
+
+ assert(kyuubiStatementQueue.size() === 1)
+ }
+ }
+
+ private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
+ val req = new TGetOperationStatusReq(op)
+ var state = client.GetOperationStatus(req).getOperationState
+ while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
+ state = client.GetOperationStatus(req).getOperationState
+ }
+ }
+}