You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "grundprinzip (via GitHub)" <gi...@apache.org> on 2023/03/30 07:38:01 UTC

[GitHub] [spark] grundprinzip commented on a diff in pull request #40586: [SPARK-42939][SS][CONNECT] Core streaming Python API for Spark Connect

grundprinzip commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1152826039


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,97 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+message WriteStreamOperation {
+
+  // (Required) The output of the `input` streaming relation will be written.
+  Relation input = 1;
+
+  string format = 2;
+  map<string, string> options = 3;
+  repeated string partitioning_column_names = 4;
+
+  oneof trigger {
+    string processing_time_interval = 5;
+    bool available_now = 6;
+    bool one_time = 7;
+    string continuous_checkpoint_interval = 8;
+  }
+
+  string output_mode = 9;

Review Comment:
   this sounds like an enum?



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -729,6 +736,18 @@ message WithColumns {
   repeated Expression.Alias aliases = 2;
 }
 
+message WithWatermark {
+
+  // (Required) The input relation
+  Relation input = 1;
+
+  // (Required)
+  string event_time = 2;

Review Comment:
   string or long?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -298,6 +298,10 @@ message ExecutePlanResponse {
     // Special case for executing SQL commands.
     SqlCommandResult sql_command_result = 5;
 
+    StreamingQueryStartResult streaming_query_start_result = 8;
+
+    StreamingQueryCommandResult streaming_query_command_result = 9;

Review Comment:
   Please provide some basic doc.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -762,22 +779,32 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformReadRel(rel: proto.Read): LogicalPlan = {
+
+    def parseSchema(schema: String): StructType = {
+      DataType.parseTypeWithFallback(
+        schema,
+        StructType.fromDDL,
+        fallbackParser = DataType.fromJson) match {
+        case s: StructType => s
+        case other => throw InvalidPlanInput(s"Invalid schema $other")
+      }
+    }

Review Comment:
   When you're moving it the couple of lines up, shouldn't we either make this a proper private method or add this as a helper method to `StructType`?



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -729,6 +736,18 @@ message WithColumns {
   repeated Expression.Alias aliases = 2;
 }
 
+message WithWatermark {
+
+  // (Required) The input relation
+  Relation input = 1;
+
+  // (Required)
+  string event_time = 2;
+
+  // (Required)
+  string delay_threshold = 3;

Review Comment:
   sounds like a numeric?



##########
python/pyspark/sql/connect/session.py:
##########
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+

Review Comment:
   nit: rm



##########
python/pyspark/sql/connect/streaming/__init__.py:
##########
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.connect.streaming.query import StreamingQuery  # StreamingQueryManager  noqa: F401
+from pyspark.sql.connect.streaming.readwriter import DataStreamReader, DataStreamWriter  # noqa: F401
+# from pyspark.sql.connect.streaming.listener import StreamingQueryListener  # noqa: F401

Review Comment:
   ?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2014,135 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  def handleWriteStreamOperation(
+      writeOp: WriteStreamOperation,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    val plan = transformRelation(writeOp.getInput)
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val writer = dataset.writeStream
+
+    if (writeOp.getFormat.nonEmpty) {
+      writer.format(writeOp.getFormat)
+    }
+
+    writer.options(writeOp.getOptionsMap)
+
+    if (writeOp.getPartitioningColumnNamesCount > 0) {
+      writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+    }
+
+    writeOp.getTriggerCase match {
+      case TriggerCase.PROCESSING_TIME_INTERVAL =>
+        writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+      case TriggerCase.AVAILABLE_NOW =>
+        writer.trigger(Trigger.AvailableNow())
+      case TriggerCase.ONE_TIME =>
+        writer.trigger(Trigger.Once())
+      case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+        writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.TRIGGER_NOT_SET =>
+    }
+
+    if (writeOp.getOutputMode.nonEmpty) {
+      writer.outputMode(writeOp.getOutputMode)
+    }
+
+    if (writeOp.getQueryName.nonEmpty) {
+      writer.queryName(writeOp.getQueryName)
+    }
+
+    val query = writeOp.getPath match {
+      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+      case "" => writer.start()
+      case path => writer.start(path)
+    }
+
+    val result = StreamingQueryStartResult
+      .newBuilder()
+      .setId(query.id.toString)
+      .setRunId(query.runId.toString)
+      .setName(Option(query.name).getOrElse(""))
+      .build()
+
+    responseObserver.onNext(
+      ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setStreamingQueryStartResult(result)
+        .build())
+  }
+
+  def handleStreamingQueryCommand(
+      command: StreamingQueryCommand,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+    val id = command.getId
+
+    val respBuilder = StreamingQueryCommandResult
+      .newBuilder()
+      .setId(command.getId)
+
+    val query = Option(session.streams.get(command.getId)).getOrElse {
+      throw new IllegalArgumentException(s"Streaming query $id is not found")
+      // TODO(SPARK-42962): Handle this better. May be cache stopped queries for a few minutes.
+    }
+
+    command.getCommandTypeCase match {
+      case StreamingQueryCommand.CommandTypeCase.STATUS =>
+        val recentProgress: Seq[String] = command.getStatus.getRecentProgressLimit match {
+          case 0 => Seq.empty
+          case limit if limit < 0 =>
+            query.recentProgress.map(_.json) // All the cached progresses.
+          case limit => query.recentProgress.takeRight(limit).map(_.json) // Most recent
+        }
+
+        val queryStatus = query.status
+
+        val statusResult = StreamingQueryCommandResult.StatusResult
+          .newBuilder()
+          .setStatusMessage(queryStatus.message)
+          .setIsDataAvailable(queryStatus.isDataAvailable)
+          .setIsTriggerActive(queryStatus.isTriggerActive)
+          .setIsActive(query.isActive)
+          .addAllRecentProgressJson(recentProgress.asJava)
+          .build()
+
+        respBuilder.setStatus(statusResult)
+
+      case StreamingQueryCommand.CommandTypeCase.STOP =>
+        query.stop()
+
+      case StreamingQueryCommand.CommandTypeCase.PROCESS_ALL_AVAILABLE =>
+        query.processAllAvailable()

Review Comment:
   for these two cases the we don't add anything to the response?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1681,6 +1722,10 @@ class SparkConnectPlanner(val session: SparkSession) {
         handleCommandPlugin(command.getExtension)
       case proto.Command.CommandTypeCase.SQL_COMMAND =>
         handleSqlCommand(command.getSqlCommand, sessionId, responseObserver)
+      case proto.Command.CommandTypeCase.WRITE_STREAM_OPERATION =>
+        handleWriteStreamOperation(command.getWriteStreamOperation, sessionId, responseObserver)
+      case proto.Command.CommandTypeCase.STREAMING_QUERY_COMMAND =>
+        handleStreamingQueryCommand(command.getStreamingQueryCommand, sessionId, responseObserver)

Review Comment:
   Luckily it pays of that I had to plumb through the responseObserver to the command handler :)



##########
python/pyspark/sql/connect/readwriter.py:
##########
@@ -37,7 +37,7 @@
     from pyspark.sql.connect._typing import ColumnOrName, OptionalPrimitiveType
     from pyspark.sql.connect.session import SparkSession
 
-__all__ = ["DataFrameReader", "DataFrameWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter", "OptionUtils", "to_str"]

Review Comment:
   Are the changes here really needed?



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,97 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+message WriteStreamOperation {

Review Comment:
   Optional is always implied by proto. However, the benefit of declaring fields as explicitly optional generates code that lets you easily check `hasField`. This improves the overall readability of the code.
   
   In addition, it lets you easily distinguish for integral types between unset and default values. 
   
   It would be great if you can be a good citizen and improve the documentation for your proto messages. Feel free to do this in the last round of the review when the protos are finalized.



##########
python/pyspark/sql/connect/session.py:
##########
@@ -489,10 +495,6 @@ def sparkContext(self) -> Any:
     def streams(self) -> Any:
         raise NotImplementedError("streams() is not implemented.")
 
-    @property
-    def readStream(self) -> Any:
-        raise NotImplementedError("readStream() is not implemented.")
-

Review Comment:
   why the move?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org