You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/05/03 17:35:59 UTC

[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API

WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1183993090


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
 
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
-      // DataStreamReader
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
-      ),
-
       // DataStreamWriter
       ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
-      ),
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),

Review Comment:
   Please see my change in core DataStreamWriter



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
-    } else if (source == SOURCE_NAME_FOREACH) {
+    } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
       val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
       startQuery(sink, extraOptions, catalogTable = catalogTable)
+    } else if (source == SOURCE_NAME_FOREACH) {
+      assertNotPartitioned(SOURCE_NAME_FOREACH)
+      val sink = new ForeachWriterTable[UnsafeRow](
+        pythonForeachWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))

Review Comment:
   A copy from 
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala#L96-L108
   
   See my comment below



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var foreachWriter: ForeachWriter[T] = null
 
+  private var pythonForeachWriter: PythonForeachWriter = null
+

Review Comment:
   I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used here because it's type parameter is the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends ForeachWriter[UnsafeRow]:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
   
   Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
   
   The original caller is in python:
   https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
   
   Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org