You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/07/11 18:37:45 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #41752: [SPARK-44201][CONNECT][SS]Add support for Streaming Listener in Scala for Spark Connect

rangadi commented on code in PR #41752:
URL: https://github.com/apache/spark/pull/41752#discussion_r1260122470


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StreamingListenerPacket.scala:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+case class StreamingListenerPacket(id: String, listener: AnyRef) extends Serializable

Review Comment:
   Can you add documentation? What is the second arg?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -47,6 +48,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   // foreachBatch() in Streaming. Lazy since most sessions don't need it.
   private lazy val dataFrameCache: ConcurrentMap[String, DataFrame] = new ConcurrentHashMap()
 
+  private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =

Review Comment:
   Should this be part of the StreamingQuery itself? It does not need to be part of the session.
   Also, when is this removed? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -152,6 +156,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def removeCachedDataFrame(dfId: String): DataFrame = {
     dataFrameCache.remove(dfId)
   }
+
+  /**
+   * Caches given StreamingQueryListener with the ID.
+   */
+  private[connect] def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = {
+    if (listenerCache.putIfAbsent(id, listener) != null) {
+      SparkException.internalError(s"A listener is already associated with id $id")
+    }
+  }
+
+  /**
+   * Returns [[StreamingQueryListener]] cached for Listener ID `id`. If it is not found, throw
+   * [[InvalidPlanInput]].
+   */
+  private[connect] def getListenerOrThrow(id: String): StreamingQueryListener = {

Review Comment:
   There does not seem to be 'remove()' method. When are these entries removed? 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -292,3 +331,31 @@ class TestForeachWriter[T] extends ForeachWriter[T] {
 case class TestClass(value: Int) {
   override def toString: String = value.toString
 }
+
+class EventCollector extends StreamingQueryListener {
+  @volatile var startEvent: QueryStartedEvent = null
+  @volatile var terminationEvent: QueryTerminatedEvent = null
+  @volatile var idleEvent: QueryIdleEvent = null
+
+  private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+  def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
+    _progressEvents.clone().toSeq
+  }
+
+  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+    startEvent = event
+  }
+
+  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+    _progressEvents += event.progress

Review Comment:
   Could you update this to do a spark operation? Like spark.write() or spark.read()? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2909,6 +2918,36 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
         session.streams.resetTerminated()
         respBuilder.setResetTerminated(true)
 
+      case StreamingQueryManagerCommand.CommandCase.ADD_LISTENER =>
+        val listenerPacket = Utils
+          .deserialize[StreamingListenerPacket](
+            command.getAddListener.getListenerPayload.toByteArray,
+            Utils.getContextOrSparkClassLoader)
+        val listener: StreamingQueryListener = listenerPacket.listener
+          .asInstanceOf[StreamingQueryListener]
+        val id: String = listenerPacket.id
+        sessionHolder.cacheListenerById(id, listener)
+        session.streams.addListener(listener)

Review Comment:
   So, from this point on this listener is treated as legacy StreamingListener. I.e. it is not a Connect streaming listener. Is that fine? Wondering what happens if user tries to run a spark command inside the listener (added a comment about this in unit test).
   For the first version here it might be fine, but I think we should have clearly defined TODO for this improvement. cc: @WweiL 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.scheduler.SparkListenerEvent
  * @since 2.0.0
  */
 @Evolving
-abstract class StreamingQueryListener {
+abstract class StreamingQueryListener extends Serializable {

Review Comment:
   Client side class is different from server side, right? 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -292,3 +331,31 @@ class TestForeachWriter[T] extends ForeachWriter[T] {
 case class TestClass(value: Int) {
   override def toString: String = value.toString
 }
+
+class EventCollector extends StreamingQueryListener {
+  @volatile var startEvent: QueryStartedEvent = null
+  @volatile var terminationEvent: QueryTerminatedEvent = null
+  @volatile var idleEvent: QueryIdleEvent = null
+
+  private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+  def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
+    _progressEvents.clone().toSeq
+  }
+
+  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+    startEvent = event
+  }
+
+  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+    _progressEvents += event.progress

Review Comment:
   If this not feasible now, we could do that a follow up to this PR with a TODO. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org