You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/03/29 18:08:23 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #40586: [SPARK-42939][SS][CONNECT] Core streaming Python API for Spark Connect

rangadi commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1152315438


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2014,136 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  def handleWriteStreamOperation(
+      writeOp: WriteStreamOperation,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    val plan = transformRelation(writeOp.getInput)
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val writer = dataset.writeStream
+
+    if (writeOp.getFormat.nonEmpty) {
+      writer.format(writeOp.getFormat)
+    }
+
+    writer.options(writeOp.getOptionsMap)
+
+    if (writeOp.getPartitioningColumnNamesCount > 0) {
+      writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+    }
+
+    writeOp.getTriggerCase match {
+      case TriggerCase.PROCESSING_TIME_INTERVAL =>
+        writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+      case TriggerCase.AVAILABLE_NOW =>
+        writer.trigger(Trigger.AvailableNow())
+      case TriggerCase.ONE_TIME =>
+        writer.trigger(Trigger.Once())
+      case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+        writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.TRIGGER_NOT_SET =>
+    }
+
+    if (writeOp.getOutputMode.nonEmpty) {
+      writer.outputMode(writeOp.getOutputMode)
+    }
+
+    if (writeOp.getQueryName.nonEmpty) {
+      writer.queryName(writeOp.getQueryName)
+    }
+
+    val query = writeOp.getPath match {
+      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+      case "" => writer.start()
+      case path => writer.start(path)
+    }
+
+    val result = StreamingQueryStartResult
+      .newBuilder()
+      .setId(query.id.toString)
+      .setRunId(query.runId.toString)
+      .setName(Option(query.name).getOrElse(""))
+      .build()
+
+    responseObserver.onNext(
+      ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setStreamingQueryStartResult(result)
+        .build())
+  }
+
+  def handleStreamingQueryCommand(
+      command: StreamingQueryCommand,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+    val id = command.getId
+
+    val respBuilder = StreamingQueryCommandResult
+      .newBuilder()
+      .setId(command.getId)
+
+    val query = Option(session.streams.get(command.getId)).getOrElse {
+      throw new IllegalArgumentException(s"Streaming query $id is not found")
+      // TODO(SPARK-42962): Handle this better. May be cache stopped queries for a few minutes.
+    }
+
+    System.currentTimeMillis()

Review Comment:
   Yep. Removed. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2014,136 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  def handleWriteStreamOperation(
+      writeOp: WriteStreamOperation,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    val plan = transformRelation(writeOp.getInput)
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val writer = dataset.writeStream
+
+    if (writeOp.getFormat.nonEmpty) {
+      writer.format(writeOp.getFormat)
+    }
+
+    writer.options(writeOp.getOptionsMap)
+
+    if (writeOp.getPartitioningColumnNamesCount > 0) {
+      writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+    }
+
+    writeOp.getTriggerCase match {
+      case TriggerCase.PROCESSING_TIME_INTERVAL =>
+        writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+      case TriggerCase.AVAILABLE_NOW =>
+        writer.trigger(Trigger.AvailableNow())
+      case TriggerCase.ONE_TIME =>
+        writer.trigger(Trigger.Once())
+      case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+        writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.TRIGGER_NOT_SET =>
+    }
+
+    if (writeOp.getOutputMode.nonEmpty) {
+      writer.outputMode(writeOp.getOutputMode)
+    }
+
+    if (writeOp.getQueryName.nonEmpty) {
+      writer.queryName(writeOp.getQueryName)
+    }
+
+    val query = writeOp.getPath match {
+      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+      case "" => writer.start()
+      case path => writer.start(path)
+    }
+
+    val result = StreamingQueryStartResult
+      .newBuilder()
+      .setId(query.id.toString)
+      .setRunId(query.runId.toString)
+      .setName(Option(query.name).getOrElse(""))
+      .build()
+
+    responseObserver.onNext(
+      ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setStreamingQueryStartResult(result)
+        .build())
+  }
+
+  def handleStreamingQueryCommand(
+      command: StreamingQueryCommand,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+    val id = command.getId
+
+    val respBuilder = StreamingQueryCommandResult
+      .newBuilder()
+      .setId(command.getId)
+
+    val query = Option(session.streams.get(command.getId)).getOrElse {
+      throw new IllegalArgumentException(s"Streaming query $id is not found")
+      // TODO(SPARK-42962): Handle this better. May be cache stopped queries for a few minutes.
+    }
+
+    System.currentTimeMillis()
+    command.getCommandTypeCase match {
+      case StreamingQueryCommand.CommandTypeCase.STATUS =>
+        val recentProgress: Seq[String] = command.getStatus.getRecentProgressLimit match {
+          case 0 => Seq.empty
+          case limit if limit < 0 =>
+            query.recentProgress.map(_.json) // All the cached progresses.
+          case limit => query.recentProgress.takeRight(limit).map(_.json) // Most recent
+        }
+
+        val queryStatus = query.status
+
+        val statusResult = StreamingQueryCommandResult.StatusResult
+          .newBuilder()
+          .setStatusMessage(queryStatus.message)
+          .setIsDataAvailable(queryStatus.isDataAvailable)
+          .setIsTriggerActive(queryStatus.isTriggerActive)
+          .setIsActive(query.isActive)
+          .addAllRecentProgressJson(recentProgress.asJava)
+          .build()
+
+        respBuilder.setStatus(statusResult)
+
+      case StreamingQueryCommand.CommandTypeCase.STOP =>
+        query.stop()
+
+      case StreamingQueryCommand.CommandTypeCase.PROCESS_ALL_AVAILABLE =>
+        query.processAllAvailable()
+
+      case StreamingQueryCommand.CommandTypeCase.EXPLAIN =>
+        val result = query match {
+          case q: StreamExecution => q.explainInternal(command.getExplain.getExtended)

Review Comment:
   Yeah, we could. I was wondering about it too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org