You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "chaoqin-li1123 (via GitHub)" <gi...@apache.org> on 2024/03/12 22:49:29 UTC

[PR] [SPARK-47107] Implement partition reader for python streaming data source [spark]

chaoqin-li1123 opened a new pull request, #45485:
URL: https://github.com/apache/spark/pull/45485

   ### What changes were proposed in this pull request?
   Piggy back the PythonPartitionReaderFactory to implement reading a data partition for python streaming data source. Add test case to verify that python streaming data source can read and process data end to end.
   
   
   ### Why are the changes needed?
   This is part of the effort to support developing streaming data source in python interface.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Add integration test to verify data are read and metrics are emitted correctly.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #45485:
URL: https://github.com/apache/spark/pull/45485#discussion_r1534824599


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -264,11 +246,40 @@ def batched(iterator: Iterator, n: int) -> Iterator:
         command = (data_source_read_func, return_type)
         pickleSer._write_with_length(command, outfile)
 
-        # Return the serialized partition values.
-        write_int(len(partitions), outfile)
-        for partition in partitions:
-            pickleSer._write_with_length(partition, outfile)
-
+        if not is_streaming:
+            # The partitioning of python batch source read is determined before query execution.
+            try:
+                partitions = reader.partitions()  # type: ignore[attr-defined]
+                if not isinstance(partitions, list):
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "'partitions' to return a list",
+                            "actual": f"'{type(partitions).__name__}'",
+                        },
+                    )
+                if not all(isinstance(p, InputPartition) for p in partitions):
+                    partition_types = ", ".join([f"'{type(p).__name__}'" for p in partitions])
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "elements in 'partitions' to be of type 'InputPartition'",
+                            "actual": partition_types,
+                        },
+                    )
+                if len(partitions) == 0:
+                    partitions = [None]
+            except NotImplementedError:
+                partitions = [None]
+
+            # Return the serialized partition values.
+            write_int(len(partitions), outfile)
+            for partition in partitions:
+                pickleSer._write_with_length(partition, outfile)
+        else:
+            # Send an empty list of partition for stream reader because partitions are planned
+            # in each microbatch during query execution.
+            write_int(0, outfile)

Review Comment:
   Got it thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45485:
URL: https://github.com/apache/spark/pull/45485#discussion_r1532121734


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -87,40 +90,178 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
     val stream = new PythonMicroBatchStream(
       pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
 
-    val initialOffset = stream.initialOffset()
-    assert(initialOffset.json == "{\"offset\": {\"partition-1\": 0}}")
-    for (_ <- 1 to 50) {
-      val offset = stream.latestOffset()
-      assert(offset.json == "{\"offset\": {\"partition-1\": 2}}")
-      assert(stream.planInputPartitions(initialOffset, offset).size == 2)
-      stream.commit(offset)
+    var startOffset = stream.initialOffset()
+    assert(startOffset.json == "{\"offset\": {\"partition-1\": 0}}")
+    for (i <- 1 to 50) {
+      val endOffset = stream.latestOffset()
+      assert(endOffset.json == s"""{"offset": {"partition-1": ${2 * i}}}""")
+      assert(stream.planInputPartitions(startOffset, endOffset).size == 2)
+      stream.commit(endOffset)
+      startOffset = endOffset
     }
     stream.stop()
   }
 
+  test("Read from simple data stream source") {
+    assume(shouldTestPandasUDFs)
+    val dataSourceScript =
+      s"""
+         |from pyspark.sql.datasource import DataSource
+         |$simpleDataStreamReaderScript
+         |
+         |class $dataSourceName(DataSource):
+         |    def schema(self) -> str:
+         |        return "id INT"
+         |    def streamReader(self, schema):
+         |        return SimpleDataStreamReader()
+         |""".stripMargin
+
+    val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)
+    assert(spark.sessionState.dataSourceManager.dataSourceExists(dataSourceName))
+    val df = spark.readStream.format(dataSourceName).load()
+
+    val stopSignal = new CountDownLatch(1)
+
+    val q = df.writeStream.foreachBatch((df: DataFrame, batchId: Long) => {
+      // checkAnswer may materialize the dataframe more than once
+      // Cache here to make sure the numInputRows metrics is consistent.
+      df.cache()
+      checkAnswer(df, Seq(Row(batchId * 2), Row(batchId * 2 + 1)))
+      if (batchId > 30) stopSignal.countDown()
+    }).start()

Review Comment:
   nit: probably good to set processing trigger with 0 delay explicitly, to ensure the overall test time would be limited.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -87,40 +90,178 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
     val stream = new PythonMicroBatchStream(
       pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
 
-    val initialOffset = stream.initialOffset()
-    assert(initialOffset.json == "{\"offset\": {\"partition-1\": 0}}")
-    for (_ <- 1 to 50) {
-      val offset = stream.latestOffset()
-      assert(offset.json == "{\"offset\": {\"partition-1\": 2}}")
-      assert(stream.planInputPartitions(initialOffset, offset).size == 2)
-      stream.commit(offset)
+    var startOffset = stream.initialOffset()
+    assert(startOffset.json == "{\"offset\": {\"partition-1\": 0}}")
+    for (i <- 1 to 50) {
+      val endOffset = stream.latestOffset()
+      assert(endOffset.json == s"""{"offset": {"partition-1": ${2 * i}}}""")
+      assert(stream.planInputPartitions(startOffset, endOffset).size == 2)
+      stream.commit(endOffset)
+      startOffset = endOffset
     }
     stream.stop()
   }
 
+  test("Read from simple data stream source") {
+    assume(shouldTestPandasUDFs)
+    val dataSourceScript =
+      s"""
+         |from pyspark.sql.datasource import DataSource
+         |$simpleDataStreamReaderScript
+         |
+         |class $dataSourceName(DataSource):
+         |    def schema(self) -> str:
+         |        return "id INT"
+         |    def streamReader(self, schema):
+         |        return SimpleDataStreamReader()
+         |""".stripMargin
+
+    val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)
+    assert(spark.sessionState.dataSourceManager.dataSourceExists(dataSourceName))
+    val df = spark.readStream.format(dataSourceName).load()
+
+    val stopSignal = new CountDownLatch(1)
+
+    val q = df.writeStream.foreachBatch((df: DataFrame, batchId: Long) => {
+      // checkAnswer may materialize the dataframe more than once
+      // Cache here to make sure the numInputRows metrics is consistent.
+      df.cache()
+      checkAnswer(df, Seq(Row(batchId * 2), Row(batchId * 2 + 1)))
+      if (batchId > 30) stopSignal.countDown()
+    }).start()
+    stopSignal.await()
+    assert(q.recentProgress.forall(_.numInputRows == 2))
+    q.stop()
+    q.awaitTermination()
+  }
+
+  test("Streaming data source read with custom partitions") {
+    assume(shouldTestPandasUDFs)
+    val dataSourceScript =
+      s"""
+         |from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
+         |class RangePartition(InputPartition):
+         |    def __init__(self, start, end):
+         |        self.start = start
+         |        self.end = end
+         |
+         |class SimpleDataStreamReader(DataSourceStreamReader):
+         |    current = 0
+         |    def initialOffset(self):
+         |        return {"offset": 0}
+         |    def latestOffset(self):
+         |        self.current += 2
+         |        return {"offset": self.current}
+         |    def partitions(self, start: dict, end: dict):
+         |        return [RangePartition(start["offset"], end["offset"])]
+         |    def commit(self, end: dict):
+         |        1 + 2
+         |    def read(self, partition: RangePartition):
+         |        start, end = partition.start, partition.end
+         |        for i in range(start, end):
+         |            yield (i, )
+         |
+         |
+         |class $dataSourceName(DataSource):
+         |    def schema(self) -> str:
+         |        return "id INT"
+         |
+         |    def streamReader(self, schema):
+         |        return SimpleDataStreamReader()
+         |""".stripMargin
+    val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)
+    assert(spark.sessionState.dataSourceManager.dataSourceExists(dataSourceName))
+    val df = spark.readStream.format(dataSourceName).load()
+
+    val stopSignal = new CountDownLatch(1)
+
+    val q = df.writeStream.foreachBatch((df: DataFrame, batchId: Long) => {
+      // checkAnswer may materialize the dataframe more than once
+      // Cache here to make sure the numInputRows metrics is consistent.
+      df.cache()
+      checkAnswer(df, Seq(Row(batchId * 2), Row(batchId * 2 + 1)))
+      if (batchId > 30) stopSignal.countDown()
+    }).start()

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on PR #45485:
URL: https://github.com/apache/spark/pull/45485#issuecomment-1993599088

   @HeartSaVioR @HyukjinKwon @allisonwang-db PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45485:
URL: https://github.com/apache/spark/pull/45485#issuecomment-2024028916

   In the meanwhile, I'm merging to master as CI has passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #45485:
URL: https://github.com/apache/spark/pull/45485#discussion_r1534755416


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -264,11 +246,40 @@ def batched(iterator: Iterator, n: int) -> Iterator:
         command = (data_source_read_func, return_type)
         pickleSer._write_with_length(command, outfile)
 
-        # Return the serialized partition values.
-        write_int(len(partitions), outfile)
-        for partition in partitions:
-            pickleSer._write_with_length(partition, outfile)
-
+        if not is_streaming:
+            # The partitioning of python batch source read is determined before query execution.
+            try:
+                partitions = reader.partitions()  # type: ignore[attr-defined]
+                if not isinstance(partitions, list):
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "'partitions' to return a list",
+                            "actual": f"'{type(partitions).__name__}'",
+                        },
+                    )
+                if not all(isinstance(p, InputPartition) for p in partitions):
+                    partition_types = ", ".join([f"'{type(p).__name__}'" for p in partitions])
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "elements in 'partitions' to be of type 'InputPartition'",
+                            "actual": partition_types,
+                        },
+                    )
+                if len(partitions) == 0:
+                    partitions = [None]
+            except NotImplementedError:
+                partitions = [None]
+
+            # Return the serialized partition values.
+            write_int(len(partitions), outfile)
+            for partition in partitions:
+                pickleSer._write_with_length(partition, outfile)
+        else:
+            # Send an empty list of partition for stream reader because partitions are planned
+            # in each microbatch during query execution.
+            write_int(0, outfile)

Review Comment:
   Could you give me some code pointers on where the planning of partitions takes place in streaming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45485:
URL: https://github.com/apache/spark/pull/45485#issuecomment-2022059872

   @chaoqin-li1123 Could you please file a JIRA ticket to add end-to-end python test suite for both streaming read and streaming write? I'm not confident that ScalaTest based test is sufficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45485:
URL: https://github.com/apache/spark/pull/45485#discussion_r1534791960


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -264,11 +246,40 @@ def batched(iterator: Iterator, n: int) -> Iterator:
         command = (data_source_read_func, return_type)
         pickleSer._write_with_length(command, outfile)
 
-        # Return the serialized partition values.
-        write_int(len(partitions), outfile)
-        for partition in partitions:
-            pickleSer._write_with_length(partition, outfile)
-
+        if not is_streaming:
+            # The partitioning of python batch source read is determined before query execution.
+            try:
+                partitions = reader.partitions()  # type: ignore[attr-defined]
+                if not isinstance(partitions, list):
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "'partitions' to return a list",
+                            "actual": f"'{type(partitions).__name__}'",
+                        },
+                    )
+                if not all(isinstance(p, InputPartition) for p in partitions):
+                    partition_types = ", ".join([f"'{type(p).__name__}'" for p in partitions])
+                    raise PySparkRuntimeError(
+                        error_class="DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "elements in 'partitions' to be of type 'InputPartition'",
+                            "actual": partition_types,
+                        },
+                    )
+                if len(partitions) == 0:
+                    partitions = [None]
+            except NotImplementedError:
+                partitions = [None]
+
+            # Return the serialized partition values.
+            write_int(len(partitions), outfile)
+            for partition in partitions:
+                pickleSer._write_with_length(partition, outfile)
+        else:
+            # Send an empty list of partition for stream reader because partitions are planned
+            # in each microbatch during query execution.
+            write_int(0, outfile)

Review Comment:
   It is in PythonMicrobatchStream's planInputPartitions() https://github.com/apache/spark/blob/6a27789ad7d59cd133653a49be0bb49729542abe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala#L48



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45485: [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source
URL: https://github.com/apache/spark/pull/45485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45485:
URL: https://github.com/apache/spark/pull/45485#issuecomment-2021891418

   @chaoqin-li1123 Could you please rebase this? Looks like there are conflicts now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org