You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/05/15 23:38:54 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API

rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194455270


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    this.source = SOURCE_NAME_FOREACH
+    this.foreachWriter = if (writer != null) {
+      ds.sparkSession.sparkContext.clean(writer.asInstanceOf[ForeachWriter[Any]])
+    } else {
+      throw new IllegalArgumentException("foreach writer cannot be null")
+    }
+    this
+  }
+
+  private[sql] def foreachConnect(writer: ForeachWriter[Any]): DataStreamWriter[T] = {

Review Comment:
   Rename it to `foreachImplementation()`? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -40,7 +41,11 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.Streami
 import org.apache.spark.connect.proto.WriteStreamOperationStart
 import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
 import org.apache.spark.ml.{functions => MLFunctions}
+<<<<<<< HEAD

Review Comment:
   Fix.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    this.source = SOURCE_NAME_FOREACH

Review Comment:
   call `foreachConnect()` here, rather than reimplementing it. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   Actually we don't need this. The method is private to `[sql]`.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   This name should be in sync with the method name in DataStreamWriter. I like `foreachPython`. 
   
   Also add a comment here that this is not public API.



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
     string path = 11;
     string table_name = 12;
   }
+
+  optional Foreach foreach = 13;
+}
+
+message Foreach {
+  // (Required) The encoded commands of the Python foreach function
+  bytes command = 1;

Review Comment:
   Sure, we can go with your current approache. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org