You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/05/03 00:51:04 UTC

[GitHub] [spark] WweiL opened a new pull request, #41026: [Spark-43132] [DO NOT REVIEW] foreach

WweiL opened a new pull request, #41026:
URL: https://github.com/apache/spark/pull/41026

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008814


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
-    } else if (source == SOURCE_NAME_FOREACH) {
+    } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
       val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
       startQuery(sink, extraOptions, catalogTable = catalogTable)
+    } else if (source == SOURCE_NAME_FOREACH) {
+      assertNotPartitioned(SOURCE_NAME_FOREACH)
+      val sink = new ForeachWriterTable[UnsafeRow](
+        pythonForeachWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))

Review Comment:
   A copy from 
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala#L96-L108
   
   See my comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194355729


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -532,7 +547,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
-  private var foreachWriter: ForeachWriter[T] = null
+  private var foreachWriter: ForeachWriter[Any] = null
+
+  private var foreachWriterEncoder: Either[ExpressionEncoder[Any], InternalRow => Any] =
+    Left(ds.exprEnc.asInstanceOf[ExpressionEncoder[Any]])

Review Comment:
   @hvanhovell Hi Herman, can you signoff this change? It can be viewed together with this PR for more context: 
   https://github.com/apache/spark/pull/41129/files#r1194152295
   
   Thanks in advance!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #41026:
URL: https://github.com/apache/spark/pull/41026#issuecomment-1533441031

   @HyukjinKwon @HeartSaVioR @xinrong-meng @rangadi @pengzhon-db @amaliujia Can you guys take a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1183993090


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   Please see my change in core DataStreamWriter



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
-    } else if (source == SOURCE_NAME_FOREACH) {
+    } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
       val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
       startQuery(sink, extraOptions, catalogTable = catalogTable)
+    } else if (source == SOURCE_NAME_FOREACH) {
+      assertNotPartitioned(SOURCE_NAME_FOREACH)
+      val sink = new ForeachWriterTable[UnsafeRow](
+        pythonForeachWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))

Review Comment:
   A copy from 
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala#L96-L108
   
   See my comment below



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used here because it's type parameter is the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends ForeachWriter[UnsafeRow]:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   The original caller is in python:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1185338014


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   @amaliujia Hi Rui, I'm not an expert on protobuf... Can I get some input from you? Thanks in advance!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1192857264


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Feels we can reuse PythonUDF, then you don't `transformPythonForeachFunction` on server, which is duplicate or existing one?



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Feels we can reuse PythonUDF, then you don't `transformPythonForeachFunction` on server, which is duplicate of existing one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1190166838


##########
python/pyspark/sql/utils.py:
##########
@@ -119,6 +124,79 @@ class Java:
         implements = ["org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchFunction"]
 
 
+def construct_foreach_function(f: Union[Callable[[Row], None], "SupportsProcess"]):
+    from pyspark.taskcontext import TaskContext
+
+    if callable(f):
+        # The provided object is a callable function that is supposed to be called on each row.
+        # Construct a function that takes an iterator and calls the provided function on each
+        # row.
+        def func_without_process(_: Any, iterator: Iterator) -> Iterator:
+            for x in iterator:
+                f(x)  # type: ignore[operator]
+            return iter([])
+
+        return func_without_process
+
+    else:
+        # The provided object is not a callable function. Then it is expected to have a
+        # 'process(row)' method, and optional 'open(partition_id, epoch_id)' and
+        # 'close(error)' methods.
+
+        if not hasattr(f, "process"):
+            raise AttributeError("Provided object does not have a 'process' method")

Review Comment:
   How about raising a `PySparkAttributeError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1193370059


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   UnsafeRow does not extend Row. Spark SQL separates the internal representation of Row from public representation, named as InternalRow. UnsafeRow extends InternalRow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008423


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is `foreachWriter` cannot be used here. `foreachWriter`'s type parameter needs to be the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends `ForeachWriter[UnsafeRow]`:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   Compiler will complain: 
   ```
   [error] /home/wei.liu/oss-spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:2248:22: type mismatch;
   [error]  found   : org.apache.spark.sql.execution.python.PythonForeachWriter
   [error]  required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
   [error]       writer.foreach(new PythonForeachWriter(pythonFcn, parseSchema(foreach.getSchema)))
   ```
   
   The original caller is in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008423


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used here because it's type parameter is the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends ForeachWriter[UnsafeRow]:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   The original caller is in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184223920


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Right we can reuse. But I think foreach conceptually doesn't belong to `PythonUDF`. We probably need to refactor this, maybe in a follow-up PR? I'm thinking that there probably needs to be a more general `message PythonFunction` that wraps all info, and `PythonUDF` could just wrap around it and here we could just use it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1195753208


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -354,7 +355,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       query
     } else if (source == SOURCE_NAME_FOREACH) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
-      val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
+      val sink = ForeachWriterTable[Any](foreachWriter,
+        ds.exprEnc.asInstanceOf[ExpressionEncoder[Any]])

Review Comment:
   I tried making 
   ` private var foreachWriter: ForeachWriter[_] = null`
   
   Then here if I do
   ```
   val sink = ForeachWriterTable[_](foreachWriter, ds.exprEnc)
   ```
   I get:
   ```
   [error] /home/wei.liu/oss-spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:358:37: unbound wildcard type
   [error]       val sink = ForeachWriterTable[_](foreachWriter, ds.exprEnc)
   [error]                                     ^
   [error] one error found
   ```
   
   If I do 
   ```
   val sink = ForeachWriterTable[Any](foreachWriter, ds.exprEnc)
   ```
   I get:
   ```
   [error] /home/wei.liu/oss-spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:358:36: overloaded method value apply with alternatives:
   [error]   (writer: org.apache.spark.sql.ForeachWriter[Any],converter: Either[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Any],org.apache.spark.sql.catalyst.InternalRow => Any])org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable[Any] <and>
   [error]   (writer: org.apache.spark.sql.ForeachWriter[Any],encoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Any])org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable[_]
   [error]  cannot be applied to (org.apache.spark.sql.ForeachWriter[_$2], org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[T])
   [error]       val sink = ForeachWriterTable[Any](foreachWriter, ds.exprEnc)
   [error]                                    ^
   [error] one error found
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1193333270


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,21 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  StreamingForeachWriter foreach = 13;

Review Comment:
   How about renaming this `foreach_writer`. That matches the `Writer` suffix used in message name. Leaving it as just `foreach` looks quite generic. 



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   What breaks if we do reuse it? Given there will more updates to PythonUDF (libraries etc), reusing would reduce work in the future. 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   > `asInstanceOf[org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]] ?`
   
   Yes, what happens we do this? 
   



##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -1122,6 +1122,85 @@ def trigger(
         self._jwrite = self._jwrite.trigger(jTrigger)
         return self
 
+    @staticmethod
+    def construct_foreach_function(f: Union[Callable[[Row], None], "SupportsProcess"]):

Review Comment:
   Nice, thanks for reusing it.
   The name should start with `_` since it is meant for internal use. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1193358986


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Nothing actually breaks if we reuse it. And I'm fine with that if people think that's the correct way. 
   
   But I really think it's not semantically right to call `foreach` function a python UDF. They are not the same. And `foreachWriter` doesn't require `output_type` and `eval_type` to be set.
   
   A better way is to create a new proto message called `pythonFunction` that contains the `byte command` and `string python_ver`. And have `PythonUDF` include it as a member. And here we can just reuse `pythonFunction`. 
   
   But I'm not expert on protoBuf I'm not sure if this is a breaking change. 
   
   @HyukjinKwon @amaliujia Can we get some input from you guys? Thank you!! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1195715408


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -354,7 +355,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       query
     } else if (source == SOURCE_NAME_FOREACH) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
-      val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
+      val sink = ForeachWriterTable[Any](foreachWriter,
+        ds.exprEnc.asInstanceOf[ExpressionEncoder[Any]])

Review Comment:
   Is this casting necessary? 
   What if we replace 'Any' with '_' for `foreachWriter`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194147496


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -455,6 +465,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     this
   }
 
+  private[sql] def foreachConnect(writer: PythonForeachWriter): DataStreamWriter[T] = {

Review Comment:
   Will this work for Scala connect too? Otherwise earlier name `foreachPython` sounds better.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   If we can't do that, we can go with so solution. 
   In that case, I think we can simplify the implementation at line 360. I will comment there. 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +554,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var connectForeachWriter: PythonForeachWriter = null

Review Comment:
   what if we declare `foreachWriter` as ForeachWriter[_]? 
   I think we can remove `connectForeachWriter`. 
   At line 360, we could have `val sink = ForewachWriterTable[_](...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194200121


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +554,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var connectForeachWriter: PythonForeachWriter = null

Review Comment:
   Yes that would work I think. I'll change it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API
URL: https://github.com/apache/spark/pull/41026


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184091839


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Can we use `PythonUDF` message? Even though 'output_type' is not needed.



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {

Review Comment:
   `Foreach` seems too short and does not convey much context. How about `StreamingForeachSink` or `StreamingForeachWriter`?



##########
python/pyspark/sql/connect/streaming/readwriter.py:
##########
@@ -339,7 +342,9 @@ def table(self, tableName: str) -> "DataFrame":
 
 
 class DataStreamWriter:
-    def __init__(self, plan: "LogicalPlan", session: "SparkSession") -> None:
+    def __init__(
+        self, plan: "LogicalPlan", session: "SparkSession"
+    ) -> None:

Review Comment:
   Not needed? 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -455,6 +463,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     this
   }
 
+  def foreachPython(writer: PythonForeachWriter): DataStreamWriter[T] = {

Review Comment:
   should be private? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1385,6 +1385,21 @@ class SparkConnectPlanner(val session: SparkSession) {
       accumulator = null)
   }
 
+  private def transformPythonForeachFunction(fun: proto.Foreach): SimplePythonFunction = {

Review Comment:
   Could use `PythonUDF` in the request proto and call `transformPythonFunction()` here (actually we many not need this new function).



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
-    } else if (source == SOURCE_NAME_FOREACH) {
+    } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {

Review Comment:
   Refactor this into `if (source == SOURCE_NAME_FOREACH) {`
   In side, it looks like we set up `sink` based on `if (foreachWriter != null) ...`



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;

Review Comment:
   remove `optional`. These are optional already. 



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I will think about this.. it might be doable since it is ok in PySpark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1192856772


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   What if you just try to cast with `asInstanceOf[org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]]` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194455270


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    this.source = SOURCE_NAME_FOREACH
+    this.foreachWriter = if (writer != null) {
+      ds.sparkSession.sparkContext.clean(writer.asInstanceOf[ForeachWriter[Any]])
+    } else {
+      throw new IllegalArgumentException("foreach writer cannot be null")
+    }
+    this
+  }
+
+  private[sql] def foreachConnect(writer: ForeachWriter[Any]): DataStreamWriter[T] = {

Review Comment:
   Rename it to `foreachImplementation()`? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -40,7 +41,11 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.Streami
 import org.apache.spark.connect.proto.WriteStreamOperationStart
 import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
 import org.apache.spark.ml.{functions => MLFunctions}
+<<<<<<< HEAD

Review Comment:
   Fix.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    this.source = SOURCE_NAME_FOREACH

Review Comment:
   call `foreachConnect()` here, rather than reimplementing it. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   Actually we don't need this. The method is private to `[sql]`.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   This name should be in sync with the method name in DataStreamWriter. I like `foreachPython`. 
   
   Also add a comment here that this is not public API.



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Sure, we can go with your current approache. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194458342


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2385,6 +2385,13 @@ class SparkConnectPlanner(val session: SparkSession) {
       writer.queryName(writeOp.getQueryName)
     }
 
+    if (writeOp.hasForeachWriter) {
+      val foreach = writeOp.getForeachWriter.getPythonWriter
+      val pythonFcn = transformPythonFunction(foreach)
+      writer.foreachConnect(
+        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])

Review Comment:
   Do we need the cast here? `PythonForeachWriter` already extends ForeachWriter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194484668


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -40,7 +41,11 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.Streami
 import org.apache.spark.connect.proto.WriteStreamOperationStart
 import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
 import org.apache.spark.ml.{functions => MLFunctions}
+<<<<<<< HEAD

Review Comment:
   This is already fixed. Please check the newest commit!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1190253907


##########
python/pyspark/sql/utils.py:
##########
@@ -119,6 +124,79 @@ class Java:
         implements = ["org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchFunction"]
 
 
+def construct_foreach_function(f: Union[Callable[[Row], None], "SupportsProcess"]):
+    from pyspark.taskcontext import TaskContext
+
+    if callable(f):
+        # The provided object is a callable function that is supposed to be called on each row.
+        # Construct a function that takes an iterator and calls the provided function on each
+        # row.
+        def func_without_process(_: Any, iterator: Iterator) -> Iterator:
+            for x in iterator:
+                f(x)  # type: ignore[operator]
+            return iter([])
+
+        return func_without_process
+
+    else:
+        # The provided object is not a callable function. Then it is expected to have a
+        # 'process(row)' method, and optional 'open(partition_id, epoch_id)' and
+        # 'close(error)' methods.
+
+        if not hasattr(f, "process"):
+            raise AttributeError("Provided object does not have a 'process' method")

Review Comment:
   Thanks! Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008423


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used. `foreachWriter`'s type parameter needs to be the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends `ForeachWriter[UnsafeRow]`:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   Compiler will complain: 
   ```
   [error] /home/wei.liu/oss-spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:2248:22: type mismatch;
   [error]  found   : org.apache.spark.sql.execution.python.PythonForeachWriter
   [error]  required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
   [error]       writer.foreach(new PythonForeachWriter(pythonFcn, parseSchema(foreach.getSchema)))
   [error] 
   ```
   
   The original caller is in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used. `foreachWriter`'s type parameter needs to be the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends `ForeachWriter[UnsafeRow]`:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   Compiler will complain: 
   ```
   [error] /home/wei.liu/oss-spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:2248:22: type mismatch;
   [error]  found   : org.apache.spark.sql.execution.python.PythonForeachWriter
   [error]  required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
   [error]       writer.foreach(new PythonForeachWriter(pythonFcn, parseSchema(foreach.getSchema)))
   ```
   
   The original caller is in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184224749


##########
python/pyspark/sql/connect/streaming/readwriter.py:
##########
@@ -339,7 +342,9 @@ def table(self, tableName: str) -> "DataFrame":
 
 
 class DataStreamWriter:
-    def __init__(self, plan: "LogicalPlan", session: "SparkSession") -> None:
+    def __init__(
+        self, plan: "LogicalPlan", session: "SparkSession"
+    ) -> None:

Review Comment:
   hmmm not sure why is this, maybe the reformatter does that. I'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1193358986


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Nothing actually breaks if we reuse it. And I'm fine with that if people think that's the correct way. 
   
   But I really think it's not semantically right to call `foreach` function a python UDF. They are not the same. And `foreachWriter` doesn't require `output_type` and `eval_type` to be set.
   
   A better way is to create a new proto message called `pythonFunction` that contains the `byte command` and `string python_ver`. And have `PythonUDF` include it as a member. And here for foreach writer we can just reuse `pythonFunction`. 
   
   But I'm not expert on protoBuf I'm not sure if this is a breaking change. 
   
   @HyukjinKwon @amaliujia Can we get some input from you guys? Thank you!! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194154060


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -455,6 +465,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     this
   }
 
+  private[sql] def foreachConnect(writer: PythonForeachWriter): DataStreamWriter[T] = {

Review Comment:
   Yes actually this change is due to the change I added in scala foreach: https://github.com/apache/spark/pull/41129/files#r1194152295
   
   The writer will be a generic ForeachWriter[Any]. Here I put PythonForeachWriter because at this stage it doesn't need to be generic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194488494


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2385,6 +2385,13 @@ class SparkConnectPlanner(val session: SparkSession) {
       writer.queryName(writeOp.getQueryName)
     }
 
+    if (writeOp.hasForeachWriter) {
+      val foreach = writeOp.getForeachWriter.getPythonWriter
+      val pythonFcn = transformPythonFunction(foreach)
+      writer.foreachConnect(
+        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])

Review Comment:
   Yes we do need it:
   ```
   [error] /home/wei.liu/oss-spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:2392:9: type mismatch;
   [error]  found   : org.apache.spark.sql.execution.python.PythonForeachWriter
   [error]  required: org.apache.spark.sql.ForeachWriter[Any]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41026:
URL: https://github.com/apache/spark/pull/41026#issuecomment-1550823334

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194355729


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -532,7 +547,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
-  private var foreachWriter: ForeachWriter[T] = null
+  private var foreachWriter: ForeachWriter[Any] = null
+
+  private var foreachWriterEncoder: Either[ExpressionEncoder[Any], InternalRow => Any] =
+    Left(ds.exprEnc.asInstanceOf[ExpressionEncoder[Any]])

Review Comment:
   @hvanhovell Hi Herman, can you signoff this change? It can be viewed together with this PR for more context: 
   https://github.com/apache/spark/pull/41129/files#r1194152295
   
   Thanks in advance!



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -532,7 +545,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
-  private var foreachWriter: ForeachWriter[T] = null
+  private var foreachWriter: ForeachWriter[Any] = null

Review Comment:
   @hvanhovell Hi Herman, can you signoff this change? It can be viewed together with this PR for more context: 
   https://github.com/apache/spark/pull/41129/files#r1194152295
   
   Thanks in advance!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008423


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is `foreachWriter` cannot be used here. `foreachWriter`'s type parameter needs to be the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends `ForeachWriter[UnsafeRow]`:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   Compiler will complain: 
   ```
   [error] /home/wei.liu/oss-spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:2248:22: type mismatch;
   [error]  found   : org.apache.spark.sql.execution.python.PythonForeachWriter
   [error]  required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
   [error]       writer.foreach(new PythonForeachWriter(pythonFcn, parseSchema(foreach.getSchema)))
   ```
   
   The original caller in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184224749


##########
python/pyspark/sql/connect/streaming/readwriter.py:
##########
@@ -339,7 +342,9 @@ def table(self, tableName: str) -> "DataFrame":
 
 
 class DataStreamWriter:
-    def __init__(self, plan: "LogicalPlan", session: "SparkSession") -> None:
+    def __init__(
+        self, plan: "LogicalPlan", session: "SparkSession"
+    ) -> None:

Review Comment:
   hmmm not sure why is this, maybe the reformatter does that. I'll change it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184008423


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used. `foreachWriter`'s type parameter needs to be the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends `ForeachWriter[UnsafeRow]`:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   The original caller is in python doesn't do this additional step and just calls `foreach`:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1184225072


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;

Review Comment:
   I remember some method like `hasForeach` or so will complain if doing that. I will try and get back to you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1185316680


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -354,7 +357,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       query
     } else if (source == SOURCE_NAME_FOREACH) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
-      val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
+      val sink = if (foreachWriter != null) {
+        ForeachWriterTable[T](foreachWriter, ds.exprEnc)
+      } else {
+        new ForeachWriterTable[UnsafeRow](
+          pythonForeachWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))
+      }

Review Comment:
   @rangadi Doing this would result into the following error:
   ```
   [error] /__w/oss-spark/oss-spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:360:18: inferred existential type org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable[_1]( forSome { type _1 >: _$1 with org.apache.spark.sql.catalyst.expressions.UnsafeRow; type _$1 }), which cannot be expressed by wildcards,  should be enabled
   [error] by making the implicit value scala.language.existentials visible.
   [error] ----
   [error] This can be achieved by adding the import clause 'import scala.language.existentials'
   [error] or by setting the compiler option -language:existentials.
   [error] See the Scaladoc for value scala.language.existentials for a discussion
   [error] why the feature should be explicitly enabled.
   [error]       val sink = if (foreachWriter != null) {
   [error]                  ^
   [error] one error found
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org