You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/05/01 19:13:14 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #40861: [SPARK-43032][CONNECT][SS] Add Streaming query manager

rangadi commented on code in PR #40861:
URL: https://github.com/apache/spark/pull/40861#discussion_r1181810503


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -236,6 +237,9 @@ message StreamingQueryInstanceId {
   // will generate a unique run_id. Therefore, every time a query is restarted from
   // checkpoint, it will have the same `id` but different `run_id`s.
   string run_id = 2;
+
+  // (Optional) The name of this query.
+  optional string name = 3;

Review Comment:
   Why is this needed. Note that query-name is not part of real identity of the query.  This can be extra field like response for 'WriteStreamOperation'. 



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -321,6 +324,50 @@ message StreamingQueryCommandResult {
   }
 }
 
+// Commands for the streaming query manager.
+message StreamingQueryManagerCommand {
+
+  // See documentation for the corresponding API method in StreamingQueryManager.
+  oneof command {
+    // active() API, returns a list of active queries.
+    bool active = 1;
+    // get() API, returns the StreamingQuery identified by id.
+    GetQueryCommand get_query = 2;
+    // awaitAnyTermination() API, wait until any query terminates or timeout.
+    AwaitAnyTerminationCommand await_any_termination = 3;
+    // resetTerminated() API.
+    bool reset_terminated = 4;
+  }
+
+  message GetQueryCommand {
+    // (Required) The unique id of the query that it wants to get.
+    string id = 1;
+  }
+
+  message AwaitAnyTerminationCommand {
+    // (Optional) The waiting time in milliseconds to wait for any query to terminate.
+    optional int64 timeout_ms = 1;
+  }
+}
+
+// Response for commands on the streaming query manager.
+message StreamingQueryManagerCommandResult {
+  oneof result_type {
+    ActiveResult active = 1;
+    StreamingQueryInstanceId query = 2;
+    AwaitAnyTerminationResult await_any_termination = 3;
+    bool reset_terminated = 4;
+  }
+
+  message ActiveResult {
+    repeated StreamingQueryInstanceId active_queries = 1;

Review Comment:
   This can be the oneof field. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org