You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/04/19 10:02:43 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

HeartSaVioR opened a new pull request, #40845:
URL: https://github.com/apache/spark/pull/40845

   ### What changes were proposed in this pull request?
   
   This PR introduces a new callback "onQueryIdle" to StreamingQueryListener, which was a part of query progress update.
   
   The signature of the new callback method is below:
   
   ```
   def onQueryIdle(event: QueryIdleEvent): Unit
   
   class QueryIdleEvent(val id: UUID, val runId: UUID) extends Event
   ```
   
   This PR proposes to provide a default implementation for onQueryIdle in StreamingQueryListener so that it does not break existing implementations of streaming query listener in Scala/Java. 
   
   Note that it's a behavioral change as users will receive the different callback when the streaming query is being idle for configured period of time (previously they receive the callback onQueryProgress), but this is worth doing as described in the section "Why are the changes needed?".
   
   ### Why are the changes needed?
   
   People has been having a lot of confusions about query progress event on idleness query; it’s not only the matter of understanding but also comes up with various types of complaints, because they tend to think the event only happens after the microbatch has finished. In addition, misunderstanding may also lead to data loss on monitoring - since we give the latest batch ID for update event on idleness, if the listener implementation blindly performs upsert the information to the external storage based on batch ID, they are in risk on losing data.
   
   This also complicates the logic because we have to memorize the execution for the previous batch, which is arguably not necessary.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. After this change, users won't get query progress update event from idle query. Instead, they will get query idle event.
   
   ### How was this patch tested?
   
   Modified UTs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171858659


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   Current behavior of Spark - if the streaming query runs for batch N and be stuck with waiting for data, Spark will give a progress update with batch N, but modifying progress update a bit to reset the number of input rows to 0, output rows to 0, blabla.
   
   This gives a huge confusion from users because we also have no-data batch which the number of input rows is also 0. There is a way to distinguish the two via looking into elapsed time info and see there is no field of microbatch execution, but I wouldn't expect moderate users would know about this. Instead, the feedback what I got is, they even didn't know about such behavior we even produce a progress update event on idle. This is honestly an awful UX.
   
   Arguably, giving the latest progress update with resetting some of values on idle is not useful at all. If users want it, users can do it by themselves. Why not avoid the confusion at all while we can?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171858659


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   Current behavior of Spark - if the streaming query runs for batch N and be stuck with waiting for data, Spark will give a progress update with batch N, but modifying progress update a bit to reset the number of input rows to 0, output rows to 0, blabla.
   
   This gives a huge confusion from users because we also have no-data batch which the number of input rows is also 0. There is a way to distinguish the two via looking into elapsed time info and see there is no field of microbatch execution (or, tracking batch IDs which are already executed), but I wouldn't expect moderate users would know about this. Instead, the feedback what I got is, they even didn't know about such behavior we even produce a progress update event on idle. This is honestly an awful UX.
   
   Arguably, giving the latest progress update with resetting some of values on idle is not useful at all. If users want it, users can do it by themselves. Why not avoid the confusion at all while we can?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1172018701


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   Thanks @HeartSaVioR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171716059


##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -87,6 +91,14 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
         """
         pass
 
+    @abstractmethod
+    def onQueryIdle(self, event: "QueryIdleEvent") -> None:
+        """
+        Called when the query is idle for a certain time period and waiting for new data to

Review Comment:
   `a certain time period` sounds ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171849181


##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -87,6 +91,14 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
         """
         pass
 
+    @abstractmethod
+    def onQueryIdle(self, event: "QueryIdleEvent") -> None:
+        """
+        Called when the query is idle for a certain time period and waiting for new data to

Review Comment:
   Sigh the config is internal, not public. I'll just shorten as idle which implicitly includes the semantic "for some amount of time". I'll also change the doc in the config so that it reflects to the new behavior. Please let me know if we think it's better to make the config be public instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171850573


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   I am not clear on the motivation for this still. I haven't fully understood the problem with onQueryProgress(). 
   In any case, should the default implementation call onQueryProgress() and that would avoid changing behavior for those not implementing this . 
   
   Or may be add 'isIdle' flag to progress? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1172170550


##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -87,6 +91,14 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
         """
         pass
 
+    @abstractmethod
+    def onQueryIdle(self, event: "QueryIdleEvent") -> None:
+        """
+        Called when the query is idle for a certain time period and waiting for new data to

Review Comment:
   Doc change looks okay to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171834142


##########
python/pyspark/sql/streaming/listener.py:
##########
@@ -87,6 +91,14 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
         """
         pass
 
+    @abstractmethod
+    def onQueryIdle(self, event: "QueryIdleEvent") -> None:
+        """
+        Called when the query is idle for a certain time period and waiting for new data to

Review Comment:
   Yeah maybe better to explicitly mention about config. Shouldn't be risky as config is another form of public API which we don't change. Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40845:
URL: https://github.com/apache/spark/pull/40845#issuecomment-1514474772

   cc. @HyukjinKwon 
   Could you please help looking into this, especially the point of backward compatibility on PySpark side? I don't see the way to do that in PySpark but I'm not an expert on Python so I might miss something. Thanks in advance!
   
   cc. @zsxwing @viirya @rangadi @jerrypeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40845:
URL: https://github.com/apache/spark/pull/40845#issuecomment-1515921604

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171858659


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   Current behavior of Spark - if the streaming query runs for batch N and be stuck with waiting for data, Spark will give a progress update with batch N, but modifying progress update a bit to reset the number of input rows to 0, output rows to 0, blabla.
   
   This gives a huge confusion from users because we also have no-data batch which the number of input rows is also 0. There is a way to distinguish the two via looking into elapsed time info and see there is no field of microbatch execution (or, tracking batch IDs which are already executed), but I wouldn't expect moderate users would know about this. Instead, the feedback what I got is, they even didn't know about such behavior we even produce a progress update event on idle. This is honestly an awful UX. The config to control the time period is "internal" not "public", which means we do not have a public doc describing this config.
   
   Arguably, giving the latest progress update with resetting some of values on idle is not useful at all. If users want it, users can do it by themselves. Why not avoid the confusion at all while we can?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40845:
URL: https://github.com/apache/spark/pull/40845#discussion_r1171861122


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
    */
   def onQueryProgress(event: QueryProgressEvent): Unit
 
+  /**
+   * Called when the query is idle for a certain time period and waiting for new data to process.
+   * @since 3.5.0
+   */
+  def onQueryIdle(event: QueryIdleEvent): Unit = {}

Review Comment:
   Calling onQueryProgress for default implementation is not feasible as the purpose is NOT to give the latest progress update on idle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #40845: [SPARK-43183][SS] Introduce a new callback "onQueryIdle" to StreamingQueryListener
URL: https://github.com/apache/spark/pull/40845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org