You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2024/01/15 09:23:55 UTC

[PR] [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark) [spark]

HeartSaVioR opened a new pull request, #44736:
URL: https://github.com/apache/spark/pull/44736

   ### What changes were proposed in this pull request?
   
   This PR proposes to add a functionality to perform backward compatibility check for StreamingQueryListener in Spark Connect (both Scala and PySpark), specifically implementing onQueryIdle or not.
   
   ### Why are the changes needed?
   
   We missed to add backward compatibility test when introducing onQueryIdle, and it led to an issue in PySpark (https://issues.apache.org/jira/browse/SPARK-45631). We added the compatibility test in PySpark but didn't add it in Spark Connect.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Modified UTs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark) [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44736:
URL: https://github.com/apache/spark/pull/44736#discussion_r1452117819


##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -26,77 +26,102 @@
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
-class TestListener(StreamingQueryListener):
+# V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`,
+# `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+class TestListenerV1(StreamingQueryListener):
     def onQueryStarted(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_start_events")
+        df.write.mode("append").saveAsTable("listener_start_events_v1")
 
     def onQueryProgress(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_progress_events")
+        df.write.mode("append").saveAsTable("listener_progress_events_v1")
+
+    def onQueryTerminated(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_terminated_events_v1")
+
+
+# V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+.
+class TestListenerV2(StreamingQueryListener):
+    def onQueryStarted(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_start_events_v2")
+
+    def onQueryProgress(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_progress_events_v2")
 
     def onQueryIdle(self, event):
         pass
 
     def onQueryTerminated(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_terminated_events")
+        df.write.mode("append").saveAsTable("listener_terminated_events_v2")
 
 
 class StreamingListenerParityTests(StreamingListenerTestsMixin, ReusedConnectTestCase):
     def test_listener_events(self):
-        test_listener = TestListener()
-
-        try:
-            self.spark.streams.addListener(test_listener)
-
-            # This ensures the read socket on the server won't crash (i.e. because of timeout)
-            # when there hasn't been a new event for a long time
-            time.sleep(30)
-
-            df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
-            df_observe = df.observe("my_event", count(lit(1)).alias("rc"))
-            df_stateful = df_observe.groupBy().count()  # make query stateful
-            q = (
-                df_stateful.writeStream.format("noop")
-                .queryName("test")
-                .outputMode("complete")
-                .start()
-            )
-
-            self.assertTrue(q.isActive)
-            # ensure at least one batch is ran
-            while q.lastProgress is None or q.lastProgress["batchId"] == 0:
-                time.sleep(5)
-            q.stop()
-            self.assertFalse(q.isActive)
-
-            time.sleep(60)  # Sleep to make sure listener_terminated_events is written successfully
-
-            start_event = pyspark.cloudpickle.loads(
-                self.spark.read.table("listener_start_events").collect()[0][0]
-            )
-
-            progress_event = pyspark.cloudpickle.loads(
-                self.spark.read.table("listener_progress_events").collect()[0][0]
-            )
-
-            terminated_event = pyspark.cloudpickle.loads(
-                self.spark.read.table("listener_terminated_events").collect()[0][0]
-            )
-
-            self.check_start_event(start_event)
-            self.check_progress_event(progress_event)
-            self.check_terminated_event(terminated_event)
-
-        finally:
-            self.spark.streams.removeListener(test_listener)
-
-            # Remove again to verify this won't throw any error
-            self.spark.streams.removeListener(test_listener)
+        def verify(test_listener, table_postfix):

Review Comment:
   This is just an indentation change, with receiving test_listener from outside of the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark) [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44736:
URL: https://github.com/apache/spark/pull/44736#issuecomment-1892966337

   cc. @zsxwing @viirya @anishshri-db @HyukjinKwon Please take a look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark) [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #44736: [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark)
URL: https://github.com/apache/spark/pull/44736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46722][CONNECT] Add a test regarding to backward compatibility check for StreamingQueryListener in Spark Connect (Scala/PySpark) [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44736:
URL: https://github.com/apache/spark/pull/44736#issuecomment-1893013415

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org