You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HyukjinKwon (via GitHub)" <gi...@apache.org> on 2023/08/22 05:56:19 UTC

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42521: [SPARK-44435][SS][CONNECT] Tests for foreachBatch and Listener

HyukjinKwon commented on code in PR #42521:
URL: https://github.com/apache/spark/pull/42521#discussion_r1300995528


##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -17,40 +17,205 @@
 
 import unittest
 import time
+import uuid
+import json
+from typing import Any, Dict, Union
 
 from pyspark.sql.tests.streaming.test_streaming_listener import StreamingListenerTestsMixin
-from pyspark.sql.streaming.listener import StreamingQueryListener, QueryStartedEvent
-from pyspark.sql.types import StructType, StructField, StringType
+from pyspark.sql.streaming.listener import (
+    StreamingQueryListener,
+    QueryStartedEvent,
+    QueryProgressEvent,
+    QueryIdleEvent,
+    QueryTerminatedEvent,
+    StateOperatorProgress,
+    StreamingQueryProgress,
+    SourceProgress,
+    SinkProgress,
+)
+from pyspark.sql.types import (
+    ArrayType,
+    StructType,
+    StructField,
+    StringType,
+    IntegerType,
+    FloatType,
+    MapType,
+)
+from pyspark.sql import Row
+from pyspark.sql.functions import count, lit
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
+def listener_event_as_dict(
+    e: Union[QueryStartedEvent, QueryProgressEvent, QueryIdleEvent, QueryTerminatedEvent]
+) -> Dict[str, Any]:
+    if isinstance(e, QueryProgressEvent):
+        return {"progress": streaming_query_progress_as_dict(e.progress)}
+    else:
+
+        def conv(obj: Any) -> Any:
+            if isinstance(obj, uuid.UUID):
+                return str(obj)
+            else:
+                return obj
+
+        return {k[1:]: conv(v) for k, v in e.__dict__.items()}
+
+
+def streaming_query_progress_as_dict(e: StreamingQueryProgress) -> Dict[str, Any]:

Review Comment:
   Simpler way might be `pyspark.cloupickle.dumps(event)`, save that as a table, and load it back, and unpickle it via `pyspark.cloudpickle.loads(binary)` and compare them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org