You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/08/16 21:30:21 UTC

[GitHub] [spark] WweiL commented on a diff in pull request #42521: [SPARK-44435][SS][CONNECT][DRAFT] Tests for foreachBatch and Listener

WweiL commented on code in PR #42521:
URL: https://github.com/apache/spark/pull/42521#discussion_r1296449283


##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -19,38 +19,153 @@
 import time
 
 from pyspark.sql.tests.streaming.test_streaming_listener import StreamingListenerTestsMixin
-from pyspark.sql.streaming.listener import StreamingQueryListener, QueryStartedEvent
-from pyspark.sql.types import StructType, StructField, StringType
+from pyspark.sql.streaming.listener import (
+    StreamingQueryListener,
+    QueryStartedEvent,
+    QueryProgressEvent,
+    QueryIdleEvent,
+    QueryTerminatedEvent,
+)
+from pyspark.sql.types import (
+    ArrayType,
+    StructType,
+    StructField,
+    StringType,
+    IntegerType,
+    FloatType,
+    MapType,
+)
+from pyspark.sql.functions import count, lit
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
 def get_start_event_schema():
     return StructType(
         [
-            StructField("id", StringType(), True),
-            StructField("runId", StringType(), True),
+            StructField("id", StringType(), False),
+            StructField("runId", StringType(), False),
             StructField("name", StringType(), True),
-            StructField("timestamp", StringType(), True),
+            StructField("timestamp", StringType(), False),
         ]
     )
 
 
+def get_idle_event_schema():
+    return StructType(
+        [
+            StructField("id", StringType(), False),
+            StructField("runId", StringType(), False),
+            StructField("timestamp", StringType(), False),
+        ]
+    )
+
+
+def get_terminated_event_schema():

Review Comment:
   I'm thinking of just move these methods to the `QueryxxxEvent`, maybe even create a method that called `asDataFrame`. To save user's effort to create this by themselves. For example, before:
   
   ```
   
   class TestListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           df = self.spark.createDataFrame(
               data=[(event.asDict())],
               schema=event.schema(),
           )
           df.write.saveAsTable("listener_start_events")
   ```
   
   After:
   
   ```
   class TestListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           df = event.asDataFrame(self.spark)
           df.write.saveAsTable("listener_start_events")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org