You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/05/11 05:16:39 UTC

[GitHub] [spark] WweiL opened a new pull request, #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

WweiL opened a new pull request, #41129:
URL: https://github.com/apache/spark/pull/41129

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1198063524


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -216,6 +216,7 @@ message WriteStreamOperationStart {
 message StreamingForeachWriter {
   oneof writer {
     PythonUDF python_writer = 1;
+    ScalarScalaUDF scala_writer = 2;

Review Comment:
   Yeah, but feel this adds more layers to your design. Do you expect to expand the writer to be more than bytes? Would you add more fields in the UDFs or in this StreamingForeachWriter?
   
   If you plan to add in StreamingForeachWriter, then it does not matter too much if you use UDF or bytes here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1223638394


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2445,10 +2451,24 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.datasetEncoder == null) {
+          // datasetEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(

Review Comment:
   Is this case possible yet un this PR? Which unit test would trigger it (may be `"forEach Int"`)? 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -202,6 +208,28 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
     this
   }
 
+  /**
+   * Sets the output of the streaming query to be processed using the provided writer object.
+   * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
+   * semantics.
+   * @since 3.5.0
+   */
+  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    // ds.encoder equal to UnboundRowEncoder means type parameter T is Row,
+    // which is not able to be serialized. Server will detect this and use default encoder.
+    val rowEncoder = if (ds.encoder != UnboundRowEncoder) {

Review Comment:
   What is TODO about? There are no details. The ticket just links back to this comment. Can we have the issue described here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205890281


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   But we don't know if this is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205870302


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   Yes, we are not supporting custom encoder yet. The unit test is disabled. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1213787122


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -163,13 +165,106 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
       assert(!terminated)
 
       q.stop()
-      // TODO (SPARK-43032): uncomment below
-      // eventually(timeout(1.minute)) {
-      // q.awaitTermination()
-      // }
+      eventually(timeout(1.minute)) {
+        q.awaitTermination()
+      }
+    }
+  }
+
+  class TestForeachWriter[T] extends ForeachWriter[T] {

Review Comment:
   Thanks! That worked!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support
URL: https://github.com/apache/spark/pull/41129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190645575


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2293,6 +2309,19 @@ class SparkConnectPlanner(val session: SparkSession) {
       writer.queryName(writeOp.getQueryName)
     }
 
+    if (writeOp.hasForeach) {
+      if (writeOp.getForeach.hasPythonWriter) {
+        val foreach = writeOp.getForeach.getPythonWriter
+        val pythonFcn = transformPythonForeachFunction(foreach)
+        writer.foreachPython(new PythonForeachWriter(pythonFcn, dataset.schema))
+      } else {
+        val scalaFcn = Utils.deserialize[ForeachWriter[Row]]( // TODO(wei): row? String?

Review Comment:
   ```
   java.lang.ClassNotFoundException: ammonite.$sess.cmd3$Helper$$anon$1
   	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
   	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
   	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
   	at java.lang.Class.forName0(Native Method)
   	at java.lang.Class.forName(Class.java:348)
   	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
   	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
   	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
   	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
   	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
   	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
   	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
   	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
   	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
   	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
   	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
   	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
   	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
   	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
   	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
   	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
   	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
   	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
   	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
   	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
   	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
   	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1198057053


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2386,10 +2393,26 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.rowEncoder == null) {
+          // rowEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(
+            foreachWriterPkt.rowEncoder.asInstanceOf[AgnosticEncoder[Any]])

Review Comment:
   It is okay for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1197276153


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2386,10 +2393,26 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.rowEncoder == null) {
+          // rowEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(
+            foreachWriterPkt.rowEncoder.asInstanceOf[AgnosticEncoder[Any]])

Review Comment:
   @zhenlineo agree. Should we try moving unpacking to execution time or doing this on server RPC thread is ok for now? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1196819312


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   Why is this required? It is not not doing anything other than default serialization. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2386,10 +2393,26 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.rowEncoder == null) {
+          // rowEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(
+            foreachWriterPkt.rowEncoder.asInstanceOf[AgnosticEncoder[Any]])

Review Comment:
   @hvanhovell is this safe? We are deserializing arbitrary Encoder class from user. Would it need user's jars? Otherwise, it could be a security issue.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -140,10 +140,88 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
       assert(!terminated)
 
       q.stop()
-      // TODO (SPARK-43032): uncomment below
-      // eventually(timeout(1.minute)) {
-      // q.awaitTermination()
-      // }
+      eventually(timeout(1.minute)) {
+        q.awaitTermination()
+      }
+    }
+  }
+
+  test("foreach Row") {
+    withTempPath { f =>
+      val path = f.getCanonicalPath + "/output"
+      val writer = new ForeachWriter[Row] {
+        var fileWriter: FileWriter = _
+
+        def open(partitionId: Long, version: Long): Boolean = {
+          fileWriter = new FileWriter(path, true)
+          true
+        }
+
+        def process(row: Row): Unit = {
+          fileWriter.write(row.mkString(", "))
+          fileWriter.write("\n")
+        }
+
+        def close(errorOrNull: Throwable): Unit = {
+          fileWriter.close()
+        }
+      }

Review Comment:
   Scala improvement:
   Define this implementation as `MyForeachWriter[T]` and use it both tests. `MyForeachWriter[Row]` and `MyForeachWriter[Int]`. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2386,10 +2393,26 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.rowEncoder == null) {
+          // rowEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(
+            foreachWriterPkt.rowEncoder.asInstanceOf[AgnosticEncoder[Any]])

Review Comment:
   Should we instead always convert it to row encoder based on schema? 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -140,10 +140,88 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
       assert(!terminated)
 
       q.stop()
-      // TODO (SPARK-43032): uncomment below
-      // eventually(timeout(1.minute)) {
-      // q.awaitTermination()
-      // }
+      eventually(timeout(1.minute)) {
+        q.awaitTermination()
+      }
+    }
+  }
+
+  test("foreach Row") {

Review Comment:
   Could you add one more test with custom type? Say `case class Person(name: String, id: Int)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190645274


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -202,6 +206,21 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
     this
   }
 
+  /**
+   * Sets the output of the streaming query to be processed using the provided writer object.
+   * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
+   * semantics.
+   * @since 3.5.0
+   */
+  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    val serialized = Utils.serialize(writer)

Review Comment:
   Serialize an arbitrary class instance directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #41129:
URL: https://github.com/apache/spark/pull/41129#issuecomment-1544324125

   @WweiL I can run your test succesfully with SBT commands:
   ```
   ./build/sbt package -Phive -Pconnect
   sbt "testOnly org.apache.spark.sql.streaming.StreamingQuerySuite"
   ```
   
   You got the `ClassNotFound` exception probably because you were trying to run via the shell?
   It would only work with Ammonite shell. If you define the user defined writer via the shell, it should also work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #41129:
URL: https://github.com/apache/spark/pull/41129#issuecomment-1544474453

   If you hit `stream classdesc serialVersionUID = -2719662620125650908, local class serialVersionUID = 6534627183855972490`
   
   It means the client has a version and server has another, when trying to mapping them java failed to match the UID.
   The solution is to let the client use the server version and move the server side version into a `sql-util` package.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190646225


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -17,131 +17,172 @@
 
 package org.apache.spark.sql.streaming
 
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.Futures.timeout
-import org.scalatest.time.SpanSugar._
+import java.io.FileWriter
 
-import org.apache.spark.sql.SQLHelper
+//import org.scalatest.concurrent.Eventually.eventually
+//import org.scalatest.concurrent.Futures.timeout
+//import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.{ForeachWriter, Row, SQLHelper}
 import org.apache.spark.sql.connect.client.util.RemoteSparkSession
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.functions.window
+//import org.apache.spark.sql.functions.col
+//import org.apache.spark.sql.functions.window
 
 class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
 
-  test("Streaming API with windowed aggregate query") {
-    // This verifies standard streaming API by starting a streaming query with windowed count.
-    withSQLConf(
-      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
-    ) {
-      val readDF = spark.readStream
-        .format("rate")
-        .option("rowsPerSecond", "10")
-        .option("numPartitions", "1")
-        .load()
-
-      // Verify schema (results in sending an RPC)
-      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
-
-      val countsDF = readDF
-        .withWatermark("timestamp", "10 seconds")
-        .groupBy(window(col("timestamp"), "5 seconds"))
-        .count()
-        .selectExpr("window.start as timestamp", "count as num_events")
-
-      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL")
-
-      // Start the query
-      val queryName = "sparkConnectStreamingQuery"
-
-      val query = countsDF.writeStream
-        .format("memory")
-        .queryName(queryName)
-        .trigger(Trigger.ProcessingTime("1 second"))
-        .start()
-
-      try {
-        // Verify some of the API.
-        assert(query.isActive)
-
-        eventually(timeout(10.seconds)) {
-          assert(query.status.isDataAvailable)
-          assert(query.recentProgress.nonEmpty) // Query made progress.
-        }
-
-        query.explain() // Prints the plan to console.
-        // Consider verifying explain output by capturing stdout similar to
-        // test("Dataset explain") in ClientE2ETestSuite.
-
-      } finally {
-        // Don't wait for any processed data. Otherwise the test could take multiple seconds.
-        query.stop()
+//  test("Streaming API with windowed aggregate query") {
+//    // This verifies standard streaming API by starting a streaming query with windowed count.
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val readDF = spark.readStream
+//        .format("rate")
+//        .option("rowsPerSecond", "10")
+//        .option("numPartitions", "1")
+//        .load()
+//
+//      // Verify schema (results in sending an RPC)
+//      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+//
+//      val countsDF = readDF
+//        .withWatermark("timestamp", "10 seconds")
+//        .groupBy(window(col("timestamp"), "5 seconds"))
+//        .count()
+//        .selectExpr("window.start as timestamp", "count as num_events")
+//
+//      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL")
+//
+//      // Start the query
+//      val queryName = "sparkConnectStreamingQuery"
+//
+//      val query = countsDF.writeStream
+//        .format("memory")
+//        .queryName(queryName)
+//        .trigger(Trigger.ProcessingTime("1 second"))
+//        .start()
+//
+//      try {
+//        // Verify some of the API.
+//        assert(query.isActive)
+//
+//        eventually(timeout(10.seconds)) {
+//          assert(query.status.isDataAvailable)
+//          assert(query.recentProgress.nonEmpty) // Query made progress.
+//        }
+//
+//        query.explain() // Prints the plan to console.
+//        // Consider verifying explain output by capturing stdout similar to
+//        // test("Dataset explain") in ClientE2ETestSuite.
+//
+//      } finally {
+//        // Don't wait for any processed data. Otherwise the test could take multiple seconds.
+//        query.stop()
+//
+//        // The query should still be accessible after stopped.
+//        assert(!query.isActive)
+//        assert(query.recentProgress.nonEmpty)
+//      }
+//    }
+//  }
+
+//  test("Streaming table API") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      spark.sql("DROP TABLE IF EXISTS my_table")
+//
+//      withTempPath { ckpt =>
+//        val q1 = spark.readStream
+//          .format("rate")
+//          .load()
+//          .writeStream
+//          .option("checkpointLocation", ckpt.getCanonicalPath)
+//          .toTable("my_table")
+//
+//        val q2 = spark.readStream
+//          .table("my_table")
+//          .writeStream
+//          .format("memory")
+//          .queryName("my_sink")
+//          .start()
+//
+//        try {
+//          q1.processAllAvailable()
+//          q2.processAllAvailable()
+//          eventually(timeout(10.seconds)) {
+//            assert(spark.table("my_sink").count() > 0)
+//          }
+//        } finally {
+//          q1.stop()
+//          q2.stop()
+//          spark.sql("DROP TABLE my_table")
+//        }
+//      }
+//    }
+//  }
+
+//  test("awaitTermination") {
+//    withSQLConf(
+//      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+//    ) {
+//      val q = spark.readStream
+//        .format("rate")
+//        .load()
+//        .writeStream
+//        .format("memory")
+//        .queryName("test")
+//        .start()
+//
+//      val start = System.nanoTime
+//      val terminated = q.awaitTermination(500)
+//      val end = System.nanoTime
+//      assert((end - start) / 1e6 >= 500)
+//      assert(!terminated)
+//
+//      q.stop()
+//      // TODO (SPARK-43032): uncomment below
+//      // eventually(timeout(1.minute)) {
+//      // q.awaitTermination()
+//      // }
+//    }
+//  }
+
+  test("foreach") {
+    withTempPath {
+      f =>
+        val path = f.getCanonicalPath + "/output"

Review Comment:
   likely not work as foreach runs on worker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205854510


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   > Either implementation is not needed.
   Yea that makes sense, let me change it.
   > Actually this change is not needed at all, I think.
   You mean we don't even need to pass encoder from client?



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   > Either implementation is not needed.
   
   Yea that makes sense, let me change it.
   > Actually this change is not needed at all, I think.
   
   You mean we don't even need to pass encoder from client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1190736102


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2293,6 +2309,19 @@ class SparkConnectPlanner(val session: SparkSession) {
       writer.queryName(writeOp.getQueryName)
     }
 
+    if (writeOp.hasForeach) {
+      if (writeOp.getForeach.hasPythonWriter) {
+        val foreach = writeOp.getForeach.getPythonWriter
+        val pythonFcn = transformPythonForeachFunction(foreach)
+        writer.foreachPython(new PythonForeachWriter(pythonFcn, dataset.schema))
+      } else {
+        val scalaFcn = Utils.deserialize[ForeachWriter[Row]]( // TODO(wei): row? String?

Review Comment:
   After starting server with `./connector/connect/bin/spark-connect-shell` this is not an issue anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #41129:
URL: https://github.com/apache/spark/pull/41129#issuecomment-1564985008

   Getting a scala test related error, the code works fine with manual REPL
   ```
    - foreach Row *** FAILED *** (15 milliseconds)
   [info]   java.io.NotSerializableException: org.scalatest.Engine
   [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
   [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
   [info]   at org.apache.spark.util.Utils$.serialize(Utils.scala:126)
   [info]   at org.apache.spark.sql.streaming.DataStreamWriter.foreach(DataStreamWriter.scala:226)
   [info]   at org.apache.spark.sql.streaming.StreamingQuerySuite.$anonfun$new$12(StreamingQuerySuite.scala:204)
   [info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterAll$$super$run(StreamingQuerySuite.scala:35)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:35)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1194152295


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +566,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var connectForeachWriter: ForeachWriter[Any] = null
+
+  private var connectForeachWriterEncoder:
+    Either[ExpressionEncoder[Any], InternalRow => Any] = null
+

Review Comment:
   @rangadi 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1196867820


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -216,6 +216,7 @@ message WriteStreamOperationStart {
 message StreamingForeachWriter {
   oneof writer {
     PythonUDF python_writer = 1;
+    ScalarScalaUDF scala_writer = 2;

Review Comment:
   It looks like you only needed bytes, would it be enough to only use bytes?
   e.g.
   ```
   oneof writer {
     bytes python_writer = 1;
     bytes scala_writer = 2;
   }
   ```



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -202,6 +208,28 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
     this
   }
 
+  /**
+   * Sets the output of the streaming query to be processed using the provided writer object.
+   * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
+   * semantics.
+   * @since 3.5.0
+   */
+  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    // ds.encoder equal to UnboundRowEncoder means type parameter T is Row,
+    // which is not able to be serialized. Server will detect this and use default encoder.
+    val rowEncoder = if (ds.encoder != UnboundRowEncoder) {

Review Comment:
   Can you add a TODO and a ticket for us [here](https://issues.apache.org/jira/browse/SPARK-42554)



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2386,10 +2393,26 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.rowEncoder == null) {
+          // rowEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(
+            foreachWriterPkt.rowEncoder.asInstanceOf[AgnosticEncoder[Any]])

Review Comment:
   If you could write the logic to work based on the schema of the dataset, pls do. The user's writer func would always expect a T, we need cast between row and T. It would be the best if we could not unpack the scala bytes (writer + encoder) before the execution, as the unpacking should be isolated.
   
   However we know this issue with encoder and current impl and udfs. It is not necessary this PR's problem to address this issue.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   Suggest rename rowEncoder -> datasetEncoder.
   
   DataStreamWriter SQL impl needed a dataset as input, then the dataset mostly is used to provide the exprEncoder. This makes sense that we directly send the encoder with the writer.
   
   You probably do not need the other methods as long as you define this class as `Serializable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1203094949


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   > We can carry these two fields in the protobuf
   For this I think if we are reusing scalaUDF it's hard. We might could use Tuple2 here



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   > We can carry these two fields in the protobuf
   
   For this I think if we are reusing scalaUDF it's hard. We might could use Tuple2 here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205846938


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, datasetEncoder: AgnosticEncoder[_])

Review Comment:
   Fix class name, should not start with lower case.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   `Either` implementation is not needed. Only `Left` is used. 
   Actually this change is not needed at all, I think. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205890454


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   Also I think even with only generic types the encoder is still needed. Let me test it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1210149861


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -163,13 +165,106 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
       assert(!terminated)
 
       q.stop()
-      // TODO (SPARK-43032): uncomment below
-      // eventually(timeout(1.minute)) {
-      // q.awaitTermination()
-      // }
+      eventually(timeout(1.minute)) {
+        q.awaitTermination()
+      }
+    }
+  }
+
+  class TestForeachWriter[T] extends ForeachWriter[T] {

Review Comment:
   Can you move this class outside of the test class e.g.
   ```
   class XXSuite {}
   class TestForeachWriter[T] ... r{}
   ```
   And see if the serdes problem is resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #41129:
URL: https://github.com/apache/spark/pull/41129#issuecomment-1543425929

   This code rn is messy because it's based on unmerged PR: https://github.com/apache/spark/pull/41026
   
   Only need to look at the part I commented


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205870558


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   We can make this change when we have that working. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205894156


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   Confirmed, I make the encoder to be null and run the `int` code
   ```
   Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Integer
   ```
   
   This is because on the server, the ds is always df (Dataset[Row]), and therefore ds.expenc doesn't cast the row to Int, which the `process` method uses



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   Confirmed, I make the encoder to be null and run the `int` code
   ```
   Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Integer
   ```
   
   This is because on the server, the ds is always df (Dataset[Row]), and therefore ds.expenc doesn't cast the row to Int, which the `process` method expects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1197276979


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -216,6 +216,7 @@ message WriteStreamOperationStart {
 message StreamingForeachWriter {
   oneof writer {
     PythonUDF python_writer = 1;
+    ScalarScalaUDF scala_writer = 2;

Review Comment:
   There was discussion about reusing `PythonUDF` or not here. We decided to reuse. Going with the same policy with Scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1203094949


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   > We can carry these two fields in the protobuf
   
   For this I think if we are reusing scalaUDF it's hard. We might could use Tuple2 however



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1203099036


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -216,6 +216,7 @@ message WriteStreamOperationStart {
 message StreamingForeachWriter {
   oneof writer {
     PythonUDF python_writer = 1;
+    ScalarScalaUDF scala_writer = 2;

Review Comment:
   @zhenlineo For python at least pythonVersion is needed so I think it's better to just use PythonUDF? Then I think it's probably better to use scalaUDF for same reason. 
   
   > Would you add more fields in the UDFs or in this StreamingForeachWriter?
   
   I don't think so. We don't seem to update foreachWriter in the future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205890454


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   Also I think even with only primitive types the encoder is still needed. Let me test it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1191450205


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2293,6 +2309,19 @@ class SparkConnectPlanner(val session: SparkSession) {
       writer.queryName(writeOp.getQueryName)
     }
 
+    if (writeOp.hasForeach) {
+      if (writeOp.getForeach.hasPythonWriter) {
+        val foreach = writeOp.getForeach.getPythonWriter
+        val pythonFcn = transformPythonForeachFunction(foreach)
+        writer.foreachPython(new PythonForeachWriter(pythonFcn, dataset.schema))
+      } else {
+        val scalaFcn = Utils.deserialize[ForeachWriter[Row]]( // TODO(wei): row? String?

Review Comment:
   Your code looks correct. Let me know if you have any other issues to run your tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41129:
URL: https://github.com/apache/spark/pull/41129#issuecomment-1583617244

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1223658042


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -202,6 +208,28 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
     this
   }
 
+  /**
+   * Sets the output of the streaming query to be processed using the provided writer object.
+   * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
+   * semantics.
+   * @since 3.5.0
+   */
+  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    // ds.encoder equal to UnboundRowEncoder means type parameter T is Row,
+    // which is not able to be serialized. Server will detect this and use default encoder.
+    val rowEncoder = if (ds.encoder != UnboundRowEncoder) {

Review Comment:
   The action item is item 1 in Zhen's approval note: https://github.com/apache/spark/pull/41129#pullrequestreview-1463092531
   
   `UnboundRowEncoder` needs special care here which is not very ideal and they want to update



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2445,10 +2451,24 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.datasetEncoder == null) {
+          // datasetEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1223658122


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2445,10 +2451,24 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     if (writeOp.hasForeachWriter) {
-      val foreach = writeOp.getForeachWriter.getPythonWriter
-      val pythonFcn = transformPythonFunction(foreach)
-      writer.foreachImplementation(
-        new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      if (writeOp.getForeachWriter.hasPythonWriter) {
+        val foreach = writeOp.getForeachWriter.getPythonWriter
+        val pythonFcn = transformPythonFunction(foreach)
+        writer.foreachImplementation(
+          new PythonForeachWriter(pythonFcn, dataset.schema).asInstanceOf[ForeachWriter[Any]])
+      } else {
+        val foreachWriterPkt = unpackForeachWriter(writeOp.getForeachWriter.getScalaWriter)
+        val clientWriter = foreachWriterPkt.foreachWriter
+        if (foreachWriterPkt.datasetEncoder == null) {
+          // datasetEncoder is null means the client-side writer has type parameter Row,
+          // Since server-side dataset is always dataframe, here just use foreach directly.
+          writer.foreach(clientWriter.asInstanceOf[ForeachWriter[Row]])
+        } else {
+          val encoder = ExpressionEncoder(

Review Comment:
   Yes. This is because server side has all columns of data: https://github.com/apache/spark/blob/238ac57a679f0fc545697be6e407ed88c9f2407d/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2419 
   
   Here the `ofRows` method returns a DataFrame, which has all columns. 
   
   So as long as you call a `.as[xxx]` method on client side, there is a type mismatch and you need to call this encoder. Same applies to `foreach Int`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1205873302


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -451,13 +451,17 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     foreachImplementation(writer.asInstanceOf[ForeachWriter[Any]])
   }
 
-  private[sql] def foreachImplementation(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
+  private[sql] def foreachImplementation(writer: ForeachWriter[Any],
+      encoder: Either[ExpressionEncoder[Any], InternalRow => Any] = null): DataStreamWriter[T] = {

Review Comment:
   I think we need to be cautious here. Would it be a problem if we update this in the future. And a client with old code try to connect to the new server?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1197275368


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   Right. We can carry these two fields in the protobuf. No need to serialize them together. That way we don't need need this case class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #41129: [SPARK-43133] Scala Client DataStreamWriter Foreach support

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #41129:
URL: https://github.com/apache/spark/pull/41129#discussion_r1203092310


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/foreachWriterPacket.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+import com.google.protobuf.ByteString
+import java.io.{InputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
+
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
+/**
+ * A wrapper class around the foreachWriter and it's Input/Output [[AgnosticEncoder]](s).
+ *
+ * This class is shared between the client and the server to allow for serialization and
+ * deserialization of the JVM object.
+ *
+ * @param foreachWriter
+ *   The actual foreachWriter from client
+ * @param rowEncoder
+ *   An [[AgnosticEncoder]] for the input row
+ */
+@SerialVersionUID(3882541391565582579L)
+case class foreachWriterPacket(foreachWriter: AnyRef, rowEncoder: AgnosticEncoder[_])

Review Comment:
   @rangadi This is basically because we also do a similar thing for UDF 
   https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfPacket.scala#L39



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org