You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/05/15 23:38:54 UTC
[GitHub] [spark] rangadi commented on a diff in pull request #41026: [SPARK-43132] [SS] [CONNECT] Python Client DataStreamWriter foreach() API
rangadi commented on code in PR #41026:
URL: https://github.com/apache/spark/pull/41026#discussion_r1194455270
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+ this.source = SOURCE_NAME_FOREACH
+ this.foreachWriter = if (writer != null) {
+ ds.sparkSession.sparkContext.clean(writer.asInstanceOf[ForeachWriter[Any]])
+ } else {
+ throw new IllegalArgumentException("foreach writer cannot be null")
+ }
+ this
+ }
+
+ private[sql] def foreachConnect(writer: ForeachWriter[Any]): DataStreamWriter[T] = {
Review Comment:
Rename it to `foreachImplementation()`?
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -40,7 +41,11 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.Streami
import org.apache.spark.connect.proto.WriteStreamOperationStart
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
import org.apache.spark.ml.{functions => MLFunctions}
+<<<<<<< HEAD
Review Comment:
Fix.
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala:
##########
@@ -446,6 +448,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+ this.source = SOURCE_NAME_FOREACH
Review Comment:
call `foreachConnect()` here, rather than reimplementing it.
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
// TypedColumn
ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
- // DataStreamReader
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
- ),
-
// DataStreamWriter
ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
- ),
+ "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),
Review Comment:
Actually we don't need this. The method is private to `[sql]`.
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -226,16 +226,9 @@ object CheckConnectJvmClientCompatibility {
// TypedColumn
ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
-
- // DataStreamReader
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144)
- ),
-
// DataStreamWriter
ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133)
- ),
+ "org.apache.spark.sql.streaming.DataStreamWriter.foreachPython"),
Review Comment:
This name should be in sync with the method name in DataStreamWriter. I like `foreachPython`.
Also add a comment here that this is not public API.
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -209,6 +209,15 @@ message WriteStreamOperationStart {
string path = 11;
string table_name = 12;
}
+
+ optional Foreach foreach = 13;
+}
+
+message Foreach {
+ // (Required) The encoded commands of the Python foreach function
+ bytes command = 1;
Review Comment:
Sure, we can go with your current approache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org