You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by GitBox <gi...@apache.org> on 2021/07/29 06:00:05 UTC

[GitHub] [incubator-kyuubi] yaooqinn commented on a change in pull request #873: Dump statementInfo into local file that the format is parquet

yaooqinn commented on a change in pull request #873:
URL: https://github.com/apache/incubator-kyuubi/pull/873#discussion_r678848536



##########
File path: externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
##########
@@ -69,22 +98,38 @@ object KyuubiStatementMonitor extends Logging{
    */
   // TODO: Lack size type threshold and time type threshold
   def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = {
-    if (kyuubiStatementQueue.size() >= maxSize) {
-      removeAndDumpStatementInfoFromQueue()
+    if (dumpEnable && kyuubiStatementQueue.size() >= maxSize) {
+      dumpStatementInfoFromQueue()
+      val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo)
+      info(s"Add kyuubiStatementInfo into queue is [$isSuccess], " +
+        s"statementId is [${kyuubiStatementInfo.statementId}]")
     }
-    val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo)
-    info(s"Add kyuubiStatementInfo into queue is [$isSuccess], " +
-      s"statementId is [${kyuubiStatementInfo.statementId}]")
   }
 
   /**
-   * This method is used for removing kyuubiStatementInfo from blockingQueue(statementQueue)
-   * and dumpping them to a file by threshold.
+   * This method is used for dumpping kyuubiStatementInfo to a file by threshold.
    */
-  // TODO: Need ensure those items have finished. If not, we should put them into this queue again.
-  private def removeAndDumpStatementInfoFromQueue(): Unit = {
-    // TODO: Just for test
-    kyuubiStatementQueue.clear()
+  private def dumpStatementInfoFromQueue(): Unit = kyuubiStatementQueue.synchronized {
+    var statementInfoList = new util.ArrayList[KyuubiStatementInfo]()
+    val size = kyuubiStatementQueue.size()
+    (0 to size-1).foreach { _ =>
+      statementInfoList.add(kyuubiStatementQueue.poll())
+    }
+    if (DataFrameUtil.getStructType(classOf[KyuubiStatementInfo]).nonEmpty) {

Review comment:
       org.apache.spark.sql.Encoders.product[xxx].schema?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org