You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/05/03 17:35:59 UTC
[GitHub] [spark] WweiL commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Add DataStreamWriter foreach() API
WweiL commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1183993090
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
// TypedColumn
ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
- // DataStreamReader
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
- ),
-
// DataStreamWriter
ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
- ),
+ "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),
Review Comment:
Please see my change in core DataStreamWriter
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -352,10 +355,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
catalogTable = catalogTable)
resultDf.createOrReplaceTempView(query.name)
query
- } else if (source == SOURCE_NAME_FOREACH) {
+ } else if (source == SOURCE_NAME_FOREACH && foreachWriter != null) {
assertNotPartitioned(SOURCE_NAME_FOREACH)
val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
startQuery(sink, extraOptions, catalogTable = catalogTable)
+ } else if (source == SOURCE_NAME_FOREACH) {
+ assertNotPartitioned(SOURCE_NAME_FOREACH)
+ val sink = new ForeachWriterTable[UnsafeRow](
+ pythonForeachWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))
Review Comment:
A copy from
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala#L96-L108
See my comment below
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -534,6 +552,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var foreachWriter: ForeachWriter[T] = null
+ private var pythonForeachWriter: PythonForeachWriter = null
+
Review Comment:
I definitely think a discussion is needed here. The reason why I add this is because `foreachWriter` cannot be used here because it's type parameter is the same as the DataStreamWriter's, which is `Row`. But `PythonForeachWriter` extends ForeachWriter[UnsafeRow]:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala#L33-L34
Therefore I can't just create a `PythonForeachWriter` and call `writer.foreach(pythonForeachWriter)` in `SparkConnectPlanner`.
The original caller is in python:
https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming/readwriter.py#L1309C24-L1315
Maybe that circumvent this check somehow, I'm also very interested if anyone knows why is that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org