You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/07/14 11:26:00 UTC

[incubator-kyuubi] branch master updated: Event Tracking: For statement (#767)

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff52b20  Event Tracking:  For statement (#767)
ff52b20 is described below

commit ff52b203e0226b70afb9762b12147b86cfb3285a
Author: Yuxiang Zhang <zh...@126.com>
AuthorDate: Wed Jul 14 19:25:51 2021 +0800

    Event Tracking:  For statement (#767)
    
    Generate KStatement info. This object includes the following elements:
    1. statement
    2. statementId
    3. appId
    4. sessionId
    5. executionId
    6. physicalPlan
    7. stateTime: contains each state and the time occurrence
    
    Those data was packaged in KStatement and we store those object in mem.
    You can get some summary data from this object.
---
 externals/kyuubi-spark-monitor/pom.xml             | 20 +++++
 .../spark/monitor/KyuubiStatementMonitor.scala     | 66 +++++++++++++++++
 .../spark/monitor/entity/KyuubiStatementInfo.scala | 48 ++++++++++++
 .../src/test/resources/log4j.properties            | 40 ++++++++++
 externals/kyuubi-spark-sql-engine/pom.xml          |  2 +-
 .../engine/spark/operation/ExecuteStatement.scala  | 20 +++++
 .../engine/spark/KyuubiStatementMonitorSuite.scala | 86 ++++++++++++++++++++++
 7 files changed, 281 insertions(+), 1 deletion(-)

diff --git a/externals/kyuubi-spark-monitor/pom.xml b/externals/kyuubi-spark-monitor/pom.xml
index 92e0283..8c60a52 100644
--- a/externals/kyuubi-spark-monitor/pom.xml
+++ b/externals/kyuubi-spark-monitor/pom.xml
@@ -31,6 +31,26 @@
     <packaging>jar</packaging>
     <name>Kyuubi Project Spark Monitor</name>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
     <build>
         <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
         <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
new file mode 100644
index 0000000..f33fbe9
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.monitor
+
+import java.util.concurrent.ArrayBlockingQueue
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
+
+// TODO: Thread Safe need to consider
+object KyuubiStatementMonitor extends Logging{
+
+  /**
+   * This blockingQueue store kyuubiStatementInfo.
+   *
+   * Notice:
+   *    1. When we remove items from this queue, we should ensure those statements have finished.
+   *       If not, we should put them into this queue again.
+   *    2. There have two kinds of threshold to trigger when to remove items from this queue:
+   *      a. time
+   *      b. this queue's current size
+   */
+  // TODO: Capacity should make configurable
+  private val kyuubiStatementQueue = new ArrayBlockingQueue[KyuubiStatementInfo](10)
+
+  /**
+   * This function is used for putting kyuubiStatementInfo into blockingQueue(statementQueue).
+   * Every time we put an item into this queue, we should judge this queue's current size at first.
+   * If the size is less than threshold, we need to remove items from this queue.
+   * @param kyuubiStatementInfo
+   */
+  // TODO: Lack size type threshold and time type threshold
+  def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = {
+    if (kyuubiStatementQueue.size() >= 7) {
+      removeAndDumpStatementInfoFromQueue()
+    }
+    val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo)
+    info(s"Add kyuubiStatementInfo into queue is [$isSuccess], " +
+      s"statementId is [${kyuubiStatementInfo.statementId}]")
+  }
+
+  /**
+   * This function is used for removing kyuubiStatementInfo from blockingQueue(statementQueue)
+   * and dumpping them to a file by threshold.
+   */
+  // TODO: Need ensure those items have finished. If not, we should put them into this queue again.
+  private def removeAndDumpStatementInfoFromQueue(): Unit = {
+    // TODO: Just for test
+    kyuubiStatementQueue.clear()
+  }
+}
diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala
new file mode 100644
index 0000000..2b61cee
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.monitor.entity
+
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.cli.HandleIdentifier
+import org.apache.kyuubi.operation.OperationState.OperationState
+
+/**
+ * This object store the summary infomation about statement.
+ * You can use statementId to get all jobs' or stages' metric that this statement has.
+ * @param statementId
+ * @param statement
+ * @param appId
+ * @param sessionId
+ * @param queryExecution: contains physicalPlan, logicPlan and so on
+ * @param exception
+ * @param stateToTime: store this statement's every state and the time of occurrence
+ */
+case class KyuubiStatementInfo(
+    statementId: String,
+    statement: String,
+    appId: String,
+    sessionId: HandleIdentifier,
+    stateToTime: Map[OperationState, Long]) {
+
+  var queryExecution: QueryExecution = null
+  var exception: KyuubiSQLException = null
+}
diff --git a/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fee75dc
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = FATAL
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = DEBUG
+
+# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
+log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
+log4j.appender.console.filter.1.AcceptOnMatch=false
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index 3bf12ee..f9b0e77 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -49,7 +49,7 @@
             <artifactId>kyuubi-spark-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
-      
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 27b897a..76bf7c9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.engine.spark.operation
 
 import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
 
+import scala.collection.mutable.Map
+
 import org.apache.spark.kyuubi.SQLOperationListener
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.types._
@@ -26,6 +28,8 @@ import org.apache.spark.sql.types._
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
+import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
 import org.apache.kyuubi.operation.{OperationState, OperationType}
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
@@ -58,6 +62,11 @@ class ExecuteStatement(
 
   private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
 
+  private val kyuubiStatementInfo = KyuubiStatementInfo(
+    statementId, statement, spark.sparkContext.applicationId,
+    session.getTypeInfo.identifier, Map(state -> lastAccessTime))
+  KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo)
+
   override protected def resultSchema: StructType = {
     if (result == null || result.schema.isEmpty) {
       new StructType().add("Result", "string")
@@ -84,6 +93,7 @@ class ExecuteStatement(
       // TODO: Make it configurable
       spark.sparkContext.addSparkListener(operationListener)
       result = spark.sql(statement)
+      kyuubiStatementInfo.queryExecution = result.queryExecution
       debug(result.queryExecution)
       iter = new ArrayFetchIterator(result.collect())
       setState(OperationState.FINISHED)
@@ -156,6 +166,16 @@ class ExecuteStatement(
     spark.sparkContext.removeSparkListener(operationListener)
     super.cleanup(targetState)
   }
+
+  override def setState(newState: OperationState): Unit = {
+    super.setState(newState)
+    kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime)
+  }
+
+  override def setOperationException(opEx: KyuubiSQLException): Unit = {
+    super.setOperationException(opEx)
+    kyuubiStatementInfo.exception = opEx
+  }
 }
 
 object ExecuteStatement {
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
new file mode 100644
index 0000000..f54ac52
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.concurrent.ArrayBlockingQueue
+
+import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationHandle}
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface
+import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.scalatest.PrivateMethodTester
+
+import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
+import org.apache.kyuubi.operation.HiveJDBCTests
+
+class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
+    with PrivateMethodTester {
+
+  override protected def jdbcUrl: String = getJdbcUrl
+  override def withKyuubiConf: Map[String, String] = Map.empty
+
+  test("add kyuubiStatementInfo into queue and remove them by size type threshold") {
+    val sql = "select timestamp'2021-06-01'"
+    val total: Int = 7
+    // Clear kyuubiStatementQueue first
+    val getQueue = PrivateMethod[
+      ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))()
+    val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue)
+    kyuubiStatementQueue.clear()
+    withSessionHandle { (client, handle) =>
+      for ( a <- 1 to total ) {
+        val req = new TExecuteStatementReq()
+        req.setSessionHandle(handle)
+        req.setStatement(sql)
+        val tExecuteStatementResp = client.ExecuteStatement(req)
+        val operationHandle = tExecuteStatementResp.getOperationHandle
+        waitForOperationToComplete(client, operationHandle)
+      }
+
+      var iterator = kyuubiStatementQueue.iterator()
+      while (iterator.hasNext) {
+        val kyuubiStatementInfo = iterator.next()
+        assert(kyuubiStatementInfo.statement !== null)
+        assert(kyuubiStatementInfo.statementId !== null)
+        assert(kyuubiStatementInfo.sessionId !== null)
+        assert(kyuubiStatementInfo.queryExecution !== null)
+        assert(kyuubiStatementInfo.stateToTime.size === 4)
+      }
+      iterator = null
+
+      // Test for clear kyuubiStatementQueue
+      // This function is used for avoiding mem leak
+      val req = new TExecuteStatementReq()
+      req.setSessionHandle(handle)
+      req.setStatement(sql)
+      val tExecuteStatementResp = client.ExecuteStatement(req)
+      val operationHandle = tExecuteStatementResp.getOperationHandle
+      waitForOperationToComplete(client, operationHandle)
+
+      assert(kyuubiStatementQueue.size() === 1)
+    }
+  }
+
+  private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
+    val req = new TGetOperationStatusReq(op)
+    var state = client.GetOperationStatus(req).getOperationState
+    while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
+      state = client.GetOperationStatus(req).getOperationState
+    }
+  }
+}