You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "chaoqin-li1123 (via GitHub)" <gi...@apache.org> on 2024/02/28 04:43:41 UTC

[PR] [WIP] implement Python streaming data sink [spark]

chaoqin-li1123 opened a new pull request, #45305:
URL: https://github.com/apache/spark/pull/45305

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   Implement Python streaming data sink
   
   
   ### Why are the changes needed?
   Implement Python streaming data sink
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Unit and integration test.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1527735697


##########
python/pyspark/sql/datasource.py:
##########
@@ -516,6 +528,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]
+            An iterator of input data.
+
+        Returns
+        -------
+        WriterCommitMessage : a serializable commit message
+        """
+        ...
+
+    def commit(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Commits this microbatch with a list of commit messages.
+
+        This method is invoked on the driver when all tasks run successfully. The
+        commit messages are collected from the ``write`` method call from each task,
+        and are passed to this method. The implementation should use the commit messages
+        to commit the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]
+            A list of commit messages.
+        """
+        ...
+
+    def abort(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Aborts this microbatch due to task failures.
+
+        This method is invoked on the driver when one or more tasks failed. The commit
+        messages are collected from the ``write`` method call from each task, and are
+        passed to this method. The implementation should use the commit messages to
+        abort the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]

Review Comment:
   `` list of :class:`WriterCommitMessage` ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1531233108


##########
python/pyspark/sql/datasource.py:
##########
@@ -513,6 +536,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]
+            An iterator of input data.
+
+        Returns
+        -------
+        WriterCommitMessage : a serializable commit message
+        """
+        ...
+
+    def commit(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Commits this microbatch with a list of commit messages.
+
+        This method is invoked on the driver when all tasks run successfully. The
+        commit messages are collected from the ``write`` method call from each task,
+        and are passed to this method. The implementation should use the commit messages
+        to commit the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]
+            A list of commit messages.
+        """

Review Comment:
   how about `batchId`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.StructType
+
+class PythonStreamingSinkCommitRunner(

Review Comment:
   Can we add a scaladoc for this class?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingWrite.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
+import org.apache.spark.sql.types.StructType
+
+class PythonStreamingWrite(

Review Comment:
   ditto for scaladoc.



##########
python/pyspark/sql/worker/python_streaming_sink_runner.py:
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_bool,
+    read_int,
+    read_long,
+    write_int,
+    SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource, WriterCommitMessage
+from pyspark.sql.types import (
+    _parse_datatype_json_string,
+    StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    try:
+        check_python_version(infile)
+        setup_spark_files(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))

Review Comment:
   nit: you might want to use a different config for streaming workers.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
       stream => stream.commit(offset)
     }
   }
+
+  Seq("append", "complete").foreach { mode =>
+    test(s"data source stream write - $mode mode") {
+      assume(shouldTestPandasUDFs)
+      val dataSource =
+        createUserDefinedPythonDataSource(dataSourceName, simpleDataStreamWriterScript)
+      spark.dataSource.registerPython(dataSourceName, dataSource)
+      val inputData = MemoryStream[Int]
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val checkpointDir = new File(path, "checkpoint")
+        checkpointDir.mkdir()
+        val outputDir = new File(path, "output")
+        outputDir.mkdir()
+        val streamDF = if (mode == "append") {
+          inputData.toDF()
+        } else {
+          // Complete mode only supports stateful aggregation

Review Comment:
   Forgive my limited knowledge of streaming, but what does 'complete' mode mean here? Is it the same as setting 'overwrite=true'?



##########
python/pyspark/sql/datasource.py:
##########
@@ -513,6 +536,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]
+            An iterator of input data.
+
+        Returns
+        -------
+        WriterCommitMessage : a serializable commit message
+        """
+        ...
+
+    def commit(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Commits this microbatch with a list of commit messages.
+
+        This method is invoked on the driver when all tasks run successfully. The
+        commit messages are collected from the ``write`` method call from each task,
+        and are passed to this method. The implementation should use the commit messages
+        to commit the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]
+            A list of commit messages.
+        """
+        ...
+
+    def abort(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Aborts this microbatch due to task failures.
+
+        This method is invoked on the driver when one or more tasks failed. The commit
+        messages are collected from the ``write`` method call from each task, and are
+        passed to this method. The implementation should use the commit messages to
+        abort the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]
+            A list of commit messages.

Review Comment:
   ditto for `batchId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1527735412


##########
python/pyspark/sql/datasource.py:
##########
@@ -516,6 +528,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]
+            An iterator of input data.
+
+        Returns
+        -------
+        WriterCommitMessage : a serializable commit message

Review Comment:
   This also should really be `` :class:`WriterCommitMessage` ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1531506644


##########
python/pyspark/sql/datasource.py:
##########
@@ -513,6 +536,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]
+            An iterator of input data.
+
+        Returns
+        -------
+        WriterCommitMessage : a serializable commit message
+        """
+        ...
+
+    def commit(self, messages: List["WriterCommitMessage"], batchId: int) -> None:
+        """
+        Commits this microbatch with a list of commit messages.
+
+        This method is invoked on the driver when all tasks run successfully. The
+        commit messages are collected from the ``write`` method call from each task,
+        and are passed to this method. The implementation should use the commit messages
+        to commit the microbatch in the streaming sink.
+
+        Parameters
+        ----------
+        messages : List[WriterCommitMessage]
+            A list of commit messages.
+        """

Review Comment:
   Doc for batchId added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1538869646


##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,29 @@ def writer(self, schema: StructType, overwrite: bool) -> "DataSourceWriter":
             message_parameters={"feature": "writer"},
         )
 
+    def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStreamWriter":

Review Comment:
   What's the purpose of the flag for overwrite? Will this be matched with complete mode? (truncate?)



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
       stream => stream.commit(offset)
     }
   }
+
+  Seq("append", "complete").foreach { mode =>
+    test(s"data source stream write - $mode mode") {
+      assume(shouldTestPandasUDFs)
+      val dataSource =
+        createUserDefinedPythonDataSource(dataSourceName, simpleDataStreamWriterScript)
+      spark.dataSource.registerPython(dataSourceName, dataSource)
+      val inputData = MemoryStream[Int]
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val checkpointDir = new File(path, "checkpoint")
+        checkpointDir.mkdir()
+        val outputDir = new File(path, "output")
+        outputDir.mkdir()
+        val streamDF = if (mode == "append") {
+          inputData.toDF()
+        } else {
+          // Complete mode only supports stateful aggregation

Review Comment:
   complete mode requires sink to support truncate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1527735948


##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,18 @@ def writer(self, schema: StructType, overwrite: bool) -> "DataSourceWriter":
             message_parameters={"feature": "writer"},
         )
 
+    def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStreamWriter":
+        """
+
+        :param schema:
+        :param overwrite:
+        :return:

Review Comment:
   docs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on PR #45305:
URL: https://github.com/apache/spark/pull/45305#issuecomment-1995481769

   PTAL,Thanks! @HeartSaVioR @HyukjinKwon @allisonwang-db @sahnib 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45305: [SPARK-47273][SS][PYTHON] implement Python data stream writer interface.
URL: https://github.com/apache/spark/pull/45305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1531248469


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
       stream => stream.commit(offset)
     }
   }
+
+  Seq("append", "complete").foreach { mode =>
+    test(s"data source stream write - $mode mode") {
+      assume(shouldTestPandasUDFs)
+      val dataSource =
+        createUserDefinedPythonDataSource(dataSourceName, simpleDataStreamWriterScript)
+      spark.dataSource.registerPython(dataSourceName, dataSource)
+      val inputData = MemoryStream[Int]
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val checkpointDir = new File(path, "checkpoint")
+        checkpointDir.mkdir()
+        val outputDir = new File(path, "output")
+        outputDir.mkdir()
+        val streamDF = if (mode == "append") {
+          inputData.toDF()
+        } else {
+          // Complete mode only supports stateful aggregation

Review Comment:
   My understanding is that Complete mode is equivalent to 'overwrite=true'. Correct me if I am wrong. @HeartSaVioR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1527736420


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -16,15 +16,24 @@
  */
 package org.apache.spark.sql.execution.python
 
+import java.io.File
+
+import scala.concurrent.duration._
+
 import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs}
 import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
 
+  import testImplicits._
+
+  val waitTimeout = 5.seconds

Review Comment:
   are 5 secs enough? I would set it to 10 or 30 secs to be sure. Streaming with Python is is pretty flaky and slow time to time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1529059252


##########
python/pyspark/sql/datasource.py:
##########
@@ -516,6 +528,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]

Review Comment:
   Sure, will create a separate pr to fix all these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1540422868


##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,29 @@ def writer(self, schema: StructType, overwrite: bool) -> "DataSourceWriter":
             message_parameters={"feature": "writer"},
         )
 
+    def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStreamWriter":

Review Comment:
   I see it's for truncate. Never mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1529058123


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -16,15 +16,24 @@
  */
 package org.apache.spark.sql.execution.python
 
+import java.io.File
+
+import scala.concurrent.duration._
+
 import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs}
 import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
 
+  import testImplicits._
+
+  val waitTimeout = 5.seconds

Review Comment:
   Changed to 15 seconds.



##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,18 @@ def writer(self, schema: StructType, overwrite: bool) -> "DataSourceWriter":
             message_parameters={"feature": "writer"},
         )
 
+    def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStreamWriter":
+        """
+
+        :param schema:
+        :param overwrite:
+        :return:

Review Comment:
   Docs added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #45305:
URL: https://github.com/apache/spark/pull/45305#issuecomment-2002705216

   Let's also fix up the linter failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1531506553


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.StructType
+
+class PythonStreamingSinkCommitRunner(

Review Comment:
   scala doc added.



##########
python/pyspark/sql/worker/python_streaming_sink_runner.py:
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    read_bool,
+    read_int,
+    read_long,
+    write_int,
+    SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource, WriterCommitMessage
+from pyspark.sql.types import (
+    _parse_datatype_json_string,
+    StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+    check_python_version,
+    read_command,
+    pickleSer,
+    send_accumulator_updates,
+    setup_memory_limits,
+    setup_spark_files,
+    utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+    try:
+        check_python_version(infile)
+        setup_spark_files(infile)
+
+        memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1"))

Review Comment:
   The sink commit runner is relatively lightweight, so reusing planner memory limit should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1527735211


##########
python/pyspark/sql/datasource.py:
##########
@@ -516,6 +528,71 @@ def abort(self, messages: List["WriterCommitMessage"]) -> None:
         ...
 
 
+class DataSourceStreamWriter(ABC):
+    """
+    A base class for data stream writers. Data stream writers are responsible for writing
+    the data to the streaming sink.
+
+    .. versionadded: 4.0.0
+    """
+
+    @abstractmethod
+    def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+        """
+        Writes data into the streaming sink.
+
+        This method is called on executors to write data to the streaming data sink in
+        each microbatch. It accepts an iterator of input data and returns a single row
+        representing a commit message, or None if there is no commit message.
+
+        The driver collects commit messages, if any, from all executors and passes them
+        to the ``commit`` method if all tasks run successfully. If any task fails, the
+        ``abort`` method will be called with the collected commit messages.
+
+        Parameters
+        ----------
+        iterator : Iterator[Row]

Review Comment:
   Would you mind creating a PR to fix those all up after several related PRs merged? It really has to be something like `` iterator of :class:`Row` ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47273][SS][PYTHON] implement Python data stream writer interface. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45305:
URL: https://github.com/apache/spark/pull/45305#issuecomment-2021873464

   I expect the "python" end-to-end tests to be added later. Other than that, we can move forward.
   
   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org