You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "kou (via GitHub)" <gi...@apache.org> on 2023/06/09 08:45:03 UTC

[GitHub] [arrow] kou opened a new pull request, #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

kou opened a new pull request, #36009:
URL: https://github.com/apache/arrow/pull/36009

   ### Rationale for this change
   
   Currently, it is undefined whether a client can call DoGet more than once. Clients may want to retry requests, and servers may not want to persist a query result forever.
   
   ### What changes are included in this PR?
   
   Add an expiration time to FlightEndpoint. If present, clients may assume they can retry DoGet requests. Otherwise, clients should avoid retrying DoGet requests.
   
   This is not a full retry protocol.
   
   Also, add "pre-defined" actions to Flight RPC for working with result sets. These are pre-defined Protobuf messages with standardized encodings for use with DoAction:
   
     * CancelFlightInfo: Asynchronously cancel the execution of a distributed query. (Replaces the equivalent Flight SQL action.)
     * RefreshFlightEndpoint: Request an extension of the expiration of a FlightEndpoint.
     * CloseFlightInfo: Close a FlightInfo so that the server can clean up resources early.
   
   This lets the ADBC/JDBC/ODBC drivers for Flight SQL explicitly manage result set lifetimes. These can be used with Flight SQL as regular actions.
   
   ### Are these changes tested?
   
   Yes.
   
   ### Are there any user-facing changes?
   
   Yes.
   
   ### TODO
   
   * [ ] Update document
   * [ ] Implement this in Go
   * [ ] Clean up the C++ implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231711829


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   Ah, to avoid confusion with Arrow `Result`? 
   
   I guess Arrow also uses `Status`, so both are the same to me. If using `CancelStatus` is clearer to you, then it sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235641556


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }
+    } else if (action.type == ActionType::kCloseFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        if (!index_result.ok()) {
+          continue;
+        }
+        auto index = *index_result;
+        statuses_[index].closed = true;
+      }
+    } else if (action.type == ActionType::kRefreshFlightEndpoint.type) {
+      ARROW_ASSIGN_OR_RAISE(auto endpoint,
+                            FlightEndpoint::Deserialize(std::string_view(*action.body)));
+      ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(endpoint.ticket.ticket));
+      if (statuses_[index].cancelled) {
+        return Status::Invalid("Invalid flight: canceled: ", endpoint.ticket.ticket);
+      }
+      endpoint.ticket.ticket += ": refreshed (+ 10 seconds)";
+      endpoint.expiration_time = Timestamp::clock::now() + std::chrono::seconds{10};
+      statuses_[index].expiration_time = endpoint.expiration_time.value();
+      ARROW_ASSIGN_OR_RAISE(auto serialized, endpoint.SerializeToString());
+      results.push_back(Result{Buffer::FromString(std::move(serialized))});
+    } else {
+      return Status::Invalid("Unknown action: ", action.type);
+    }
+    *result_stream = std::make_unique<SimpleResultStream>(std::move(results));
+    return Status::OK();
+  }
+
+  Status ListActions(const ServerCallContext& context,
+                     std::vector<ActionType>* actions) override {
+    *actions = {
+        ActionType::kCancelFlightInfo,
+        ActionType::kCloseFlightInfo,
+        ActionType::kRefreshFlightEndpoint,
+    };
+    return Status::OK();
+  }
+
+ private:
+  void AddEndpoint(std::vector<FlightEndpoint>& endpoints, std::string ticket,
+                   std::optional<Timestamp> expiration_time) {
+    endpoints.push_back(FlightEndpoint{
+        {std::to_string(statuses_.size()) + ": " + ticket}, {}, expiration_time});
+    statuses_.emplace_back(expiration_time);
+  }
+
+  arrow::Result<uint32_t> ExtractIndexFromTicket(const std::string& ticket) {
+    auto index_string = arrow::internal::SplitString(ticket, ':', 2)[0];
+    uint32_t index;
+    if (!arrow::internal::ParseUnsigned(index_string.data(), index_string.length(),
+                                        &index)) {
+      return Status::KeyError("Invalid flight: no index: ", ticket);
+    }
+    if (index >= statuses_.size()) {
+      return Status::KeyError("Invalid flight: out of index: ", ticket);
+    }
+    return index;
+  }
+
+  std::shared_ptr<Schema> BuildSchema() {
+    return arrow::schema({arrow::field("number", arrow::uint32(), false)});
+  }
+
+  std::vector<EndpointStatus> statuses_;
+};
+
+/// \brief The expiration time scenario - DoGet.
+///
+/// This tests that the client can read data that isn't expired yet
+/// multiple times and can't read data after it's expired.
+class ExpirationTimeDoGetScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    *server = std::make_unique<ExpirationTimeServer>();
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, client->GetFlightInfo(FlightDescriptor::Command("expiration_time")));
+    std::vector<std::shared_ptr<arrow::Table>> tables;
+    // First read from all endpoints
+    for (const auto& endpoint : info->endpoints()) {
+      ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
+      ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
+      tables.push_back(table);
+    }
+    // Re-reads only from endpoints that have expiration time
+    for (const auto& endpoint : info->endpoints()) {
+      if (endpoint.expiration_time.has_value()) {
+        ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
+        ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
+        tables.push_back(table);
+      } else {
+        auto reader = client->DoGet(endpoint.ticket);
+        if (reader.ok()) {
+          return Status::Invalid(
+              "Data that doesn't have expiration time "
+              "shouldn't be readable multiple times");
+        }
+      }
+    }
+    // Re-reads after expired
+    for (const auto& endpoint : info->endpoints()) {
+      if (!endpoint.expiration_time.has_value()) {
+        continue;
+      }
+      const auto& expiration_time = endpoint.expiration_time.value();
+      if (expiration_time > Timestamp::clock::now()) {
+        std::this_thread::sleep_for(expiration_time - Timestamp::clock::now());
+      }

Review Comment:
   I wonder if this might be flaky in CI (at least in the prior loop), since if it takes more than 2 seconds somehow it'll fail above. Maybe there's no need to test the actual expiration of the ticket? Or, we can treat the expiration as a counter instead of a real expiration time to make things not dependent on the clock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1236034912


##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java:
##########
@@ -383,13 +424,38 @@ default void beginTransaction(ActionBeginTransactionRequest request, CallContext
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
   }
 
+  /**
+   * Explicitly cancel a query.
+   *
+   * @param info    The FlightInfo of the query to cancel.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void cancelFlightInfo(FlightInfo info, CallContext context, StreamListener<CancelStatus> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
+
+  /**
+   * Explicitly free resources associated with a query.
+   *
+   * @param info    The FlightInfo of the query to close.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void closeFlightInfo(FlightInfo info, CallContext context, StreamListener<Result> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
   /**
    * Explicitly cancel a query.
    *
    * @param info     The FlightInfo of the query to cancel.
    * @param context  Per-call context.
    * @param listener Whether cancellation succeeded.
+   * @deprecated Prefer {@link #cancelFlightInfo(FlightInfo, CallContext, StreamListener)}.
    */
+  @Deprecated
   default void cancelQuery(FlightInfo info, CallContext context, StreamListener<CancelResult> listener) {
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());

Review Comment:
   Ah, whoops. I'll do that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1239773329


##########
format/Flight.proto:
##########
@@ -183,6 +183,15 @@ message Action {
   bytes body = 2;
 }
 
+/*
+ * The request of the RenewFlightEndpoint action.
+ *
+ * The request should be stored in Action.body.
+ */
+message RenewFlightEndpointRequest {

Review Comment:
   Hmm, at this point should we just have request/result messages for all 3 actions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1602126016

   OK. Let's remove expired checks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1228899238


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   It makes sense.
   
   I've added `ResultStream::Drain()`.
   
   I noticed that we may not need to share `ReadResult()`. Why all `DoAction` results in Flight SQL wraps result message by `google.protobuf.Any`? Can we put `FlightInfo`/`FlightEndpoint` to `Result.body` directly?
   
   If we need to wrap `FlightInfo`/`FlightEndpoint` by `google.protobuf.Any`, servers that implement `CancelFlightInfo`/`CloseFlightInfo`/`RefreshFlightEndpoint` need to use Protobuf headers. I think that we want to avoid it.
   (We can avoid it by adding `ServerBase::{CancelFlightInfo,CloseFlightInfo,RefreshFlightEndpoint}` like `FlightSqlServerBase` but providing the default `ServerBase::{DoAction,ListActions}` with `CancelFlightInfo`/`CloseFlightInfo`/`RefreshFlightEndpoint` may not be a good idea...) 



##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {

Review Comment:
   OK. I've removed `Action`.



##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {

Review Comment:
   Oh...



##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   I moved `CancelResult` to top-level to reduce `ActionCancelFlightInfoResult::...` prefix.
   Is it OK?



##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   Can we use `CancelStatus` or something instead of `CancelResult` to use `Result` in `CancelResult` and `ActionCancelFlightInfoResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1624541893

   Conbench analyzed the 6 benchmark runs on commit `0b7bd74d`.
   
   There were 7 benchmark results indicating a performance regression:
   
   - Commit Run on `arm64-m6g-linux-compute` at [2023-07-03 02:19:06Z](http://conbench.ursa.dev/compare/runs/4cd8f10aeefc482da4f33113dd50789e...0954b2f425b847e8a86642af3f0dea6d/)
     - [params=32768/10000, source=cpp-micro, suite=parquet-encoding-benchmark](http://conbench.ursa.dev/compare/benchmarks/064a1c4bfcf67d618000abe9cf75a940...064a2307e204749c8000fdb730a88530)
   
   - Commit Run on `arm64-t4g-linux-compute` at [2023-07-03 02:27:32Z](http://conbench.ursa.dev/compare/runs/0b89bcf3acf649a690ebc0a787693f02...6c1e525dbea94449816688f85fa7a3be/)
     - [params=complex_expression/batch_size:100000/real_time, source=cpp-micro, suite=arrow-acero-project-benchmark](http://conbench.ursa.dev/compare/benchmarks/064a1c6643f478218000282e990746f8...064a23267167758580004554e5a7820c)
   - and 5 more (see the report linked below)
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14845896070) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1225045025


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {

Review Comment:
   We may want to remove `Cancel`:
   
   ```suggestion
     enum Result {
   ```
   
   Or we may want to move this to top-level and return this enum directly instead of wrapping by `ActionCancelFlightInfoResult` message.



##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {

Review Comment:
   We may want to remove `Action`:
   
   ```suggestion
   message CancelFlightInfoResult {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1228299572


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {

Review Comment:
   +1 from me too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1599649017

   Ok, looks like the Java-C++ integration passes locally for me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1599634692

   Java support: https://github.com/kou/arrow/pull/12
   
   Though I haven't tested the integration bits yet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1600722357

   The Java integration passes locally for me, so I do think it's a timing issue. It might be enough to just test that the expiration time and refresh gets passed through and parsed properly, and not necessarily need to implement the expiration semantics? Since what we care about in the integration tests is just that the data makes it through properly. (And then we might use fixed timestamps for the expirations so all implementations know what values to expect.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231703149


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   Ah, sorry.
   
   The `Any` was a choice by the original Dremio developers, it gives you more type safety in Protobuf but is largely redundant, yes. (That said, even when wrapped in Any, we can still provide a function/method to parse it right?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1226922526


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {

Review Comment:
   +1 from me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1229979356


##########
go/arrow/flight/client.go:
##########
@@ -348,6 +352,92 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {

Review Comment:
   should we add a documentation comment saying that this will return err == io.EOF? or should we add `err = nil` before the return here?



##########
go/arrow/flight/server.go:
##########
@@ -49,9 +49,36 @@ type (
 	Action                          = flight.Action
 	ActionType                      = flight.ActionType
 	Result                          = flight.Result
+	CancelFlightInfoResult    = flight.CancelFlightInfoResult
+	CancelResult = flight.CancelResult

Review Comment:
   indentation



##########
go/arrow/flight/flightsql/client_test.go:
##########
@@ -60,6 +60,18 @@ func (m *FlightServiceClientMock) AuthenticateBasicToken(_ context.Context, user
 	return args.Get(0).(context.Context), args.Error(1)
 }
 
+func (m *FlightServiceClientMock) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (CancelFlightInfoResult, error) {
+	panic("not implemented") // TODO: Implement

Review Comment:
   can this just follow the pattern above of `args := m.Called(info, opts)` and `return args.Get(0).(CancelFlightInfoResult), args.Error(1)` so we can use the mock to create the tests?



##########
go/arrow/flight/client.go:
##########
@@ -348,6 +352,92 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
+	var action flight.Action
+	action.Type = CloseFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (c *client) RefreshFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error) {
+	var err error
+
+	var action flight.Action
+	action.Type = RefreshFlightEndpointActionType
+	if action.Body, err = proto.Marshal(endpoint); err != nil {
+		return nil, err
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return nil, err
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return nil, err
+	}
+	var refreshedEndpoint FlightEndpoint
+	if err = proto.Unmarshal(res.Body, &refreshedEndpoint); err != nil {
+		return nil, err
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &refreshedEndpoint, nil
+}

Review Comment:
   since we're returning nil here, we should have the previous ones also return nil when they succeed rather than io.EOF that they return now



##########
go/arrow/flight/client.go:
##########
@@ -348,6 +352,92 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
+	var action flight.Action
+	action.Type = CloseFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {

Review Comment:
   same comment as above, should we add documentation that this returns `io.EOF` on success? or should we return nil?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1232769570


##########
cpp/src/arrow/flight/types.h:
##########
@@ -630,6 +697,11 @@ class ARROW_FLIGHT_EXPORT ResultStream {
 
   ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.")
   Status Next(std::unique_ptr<Result>* info);
+
+  /// \brief Read and drop the all rest messages to get error from a server.

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235515948


##########
go/arrow/flight/client.go:
##########
@@ -348,10 +352,87 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+// Ensure the result of a DoAction is fully consumed
+func ReadUntilEOF(stream FlightService_DoActionClient) error {

Review Comment:
   Ah, I see you're using it in flightsql, nvm. it's fine to leave this exported. Though we should update the docstring. Godoc format says that the comment should start with the name of the function. So this comment should be of the form:
   
   `ReadUntilEOF will drain a stream until either an error is returned or EOF is encountered and nil is returned.`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235535699


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -640,7 +668,17 @@ type Server interface {
 	// EndTransaction commits or rollsback a transaction
 	EndTransaction(context.Context, ActionEndTransactionRequest) error
 	// CancelQuery attempts to explicitly cancel a query
+	// Deprecated: Since 13.0.0. If you can require all clients
+	// use 13.0.0 or later, you can use only CancelFlightInfo and
+	// you don't need to use CancelQuery. Otherwise, you may need
+	// to use CancelQuery and/or CancelFlightInfo.

Review Comment:
   Having `flightSqlServer` have its own implementation of `CancelQuery` as I mentioned above also has the nice effect here of allowing users to switch entirely to `CancelFlightInfo` safely regardless of the client as the server will do the right thing then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1239544932


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -927,10 +937,78 @@ func (f *flightSqlServer) ListActions(_ *flight.Empty, stream flight.FlightServi
 	return nil
 }
 
+func cancelStatusToCancelResult(status flight.CancelStatus) CancelResult {
+	switch status {
+	case flight.CancelStatusUnspecified:
+		return CancelResultUnspecified
+	case flight.CancelStatusCancelled:
+		return CancelResultCancelled
+	case flight.CancelStatusCancelling:
+		return CancelResultCancelling
+	case flight.CancelStatusNotCancellable:
+		return CancelResultNotCancellable
+	default:
+		return CancelResultUnspecified
+	}
+}
+
 func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightService_DoActionServer) error {
 	var anycmd anypb.Any
 
 	switch cmd.Type {
+	case flight.CancelFlightInfoActionType:
+		var (
+			info   flight.FlightInfo
+			result flight.CancelFlightInfoResult
+			err    error
+		)
+
+		if err = proto.Unmarshal(cmd.Body, &info); err != nil {
+			return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelFlightInfo: %s", err.Error())
+		}
+
+		if result, err = f.srv.CancelFlightInfo(stream.Context(), &info); err != nil {
+			return err
+		}

Review Comment:
   Thanks! It works!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1604213513

   I'll fix up the Java in a moment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1225051046


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   We should use prefix.
   
   https://protobuf.dev/programming-guides/style/#enums
   
   > Prefer prefixing enum values instead of surrounding them in an enclosing message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1240416844


##########
format/Flight.proto:
##########
@@ -183,6 +183,15 @@ message Action {
   bytes body = 2;
 }
 
+/*
+ * The request of the RenewFlightEndpoint action.
+ *
+ * The request should be stored in Action.body.
+ */
+message RenewFlightEndpointRequest {

Review Comment:
   OK!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1224357589


##########
dev/archery/archery/integration/runner.py:
##########
@@ -435,6 +435,37 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
             description="Ensure FlightInfo.ordered is supported.",
             skip={"JS", "C#", "Rust"},
         ),
+        Scenario(
+            "expiration_time:do_get",
+            description=("Ensure FlightEndpoint.expiration_time with "
+                         "DoGet is working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:list_actions",
+            description=("Ensure FlightEndpoint.expiration_time related "
+                         "pre-defined actions is working with ListActions "
+                         "as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:cancel_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "CancelFlightInfo are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:refresh_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "RefreshFlightEndpoint are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:close_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "CloseFlightInfo are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},

Review Comment:
   I can work on Java next week



##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -324,6 +324,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   Status Rollback(const FlightCallOptions& options, const Savepoint& savepoint);
 
   /// \brief Explicitly cancel a query.
+  /// Deprecated since 13.0.0. Use FlightClient::CancelFlightInfo() instead.

Review Comment:
   Doxygen has a [`\deprecated`](https://www.doxygen.nl/manual/commands.html#cmddeprecated) command



##########
format/Flight.proto:
##########
@@ -321,6 +348,13 @@ message FlightEndpoint {
    * represent redundant and/or load balanced services.
    */
   repeated Location location = 2;
+
+  /*
+   * Expiration time of this stream. If present, clients may assume
+   * they can retry DoGet requests. Otherwise, clients should avoid
+   * retrying DoGet requests.

Review Comment:
   I wonder if wording it as "it is application-defined whether DoGet requests may be retried" might be better since technically that is how it was before



##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   We should drain the rest of the DoAction stream since that's the only way we can get the server error (if any). You can see how this is done in the Flight SQL codebase: https://github.com/apache/arrow/blob/766e25440250dde9117e19245389badb5ed7678c/cpp/src/arrow/flight/sql/client.cc#L131-L137



##########
cpp/src/arrow/flight/serialization_internal.cc:
##########
@@ -147,6 +169,15 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint)
   for (const Location& location : endpoint.locations) {
     RETURN_NOT_OK(ToProto(location, pb_endpoint->add_location()));
   }
+  if (endpoint.expiration_time.has_value()) {
+    const auto expiration_time = endpoint.expiration_time.value();
+    const auto since_epoch = expiration_time.time_since_epoch();
+    const auto since_epoch_ns =
+        std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
+    auto pb_expiration_time = pb_endpoint->mutable_expiration_time();
+    pb_expiration_time->set_seconds(since_epoch_ns / 1000000000);

Review Comment:
   nit: constant for the nanoseconds to seconds conversion? (Do we already have one?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1225047372


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   `CANCEL_RESULT_` prefix may be redundant.
   
   ```suggestion
       UNSPECIFIED = 0;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1233188550


##########
go/arrow/flight/client.go:
##########
@@ -348,6 +352,92 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {

Review Comment:
   Good catch! I missed `err = nil`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231704110


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   > Can we use CancelStatus or something instead of CancelResult to use Result in CancelResult and ActionCancelFlightInfoResult?
   
   Sorry, what are you proposing here?
   
   > I moved CancelResult to top-level to reduce ActionCancelFlightInfoResult::... prefix.
   
   I think that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231716218


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   > BTW, we have `message Result` in `Flight.proto`. It may confuse us with Arrow `Result`...
   
   Yes, the Flight one actually happened to come first, but it's unfortunate...
   
   > No. I just want to avoid Result duplication in CancelFlightInfoResult and CancelResult.
   
   Ok. I think `status` is also good then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231715010


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   BTW, we have `message Result` in `Flight.proto`. It may confuse us with Arrow `Result`...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1229522053


##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -594,7 +594,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
   virtual Status EndTransaction(const ServerCallContext& context,
                                 const ActionEndTransactionRequest& request);
 
+  /// \brief Attempt to explicitly cancel a FlightInfo.
+  /// \param[in] context  The call context.
+  /// \param[in] info     The FlightInfo to cancel.
+  /// \return             The cancellation result.
+  virtual arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
+      const ServerCallContext& context, const FlightInfo& info);

Review Comment:
   (And the unit test doesn't need to test anything complicated, just that the RPC makes it through to the right handler.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1591090561

   For Flight SQL, we could mark the old Cancel action deprecated in Protobuf and in the APIs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231697076


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   @lidavidm ping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231714524


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   > Ah, to avoid confusion with Arrow `Result`?
   
   No. I just want to avoid `Result` duplication in `CancelFlightInfoResult` and `CancelResult`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1228965197


##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -594,7 +594,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
   virtual Status EndTransaction(const ServerCallContext& context,
                                 const ActionEndTransactionRequest& request);
 
+  /// \brief Attempt to explicitly cancel a FlightInfo.
+  /// \param[in] context  The call context.
+  /// \param[in] info     The FlightInfo to cancel.
+  /// \return             The cancellation result.
+  virtual arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
+      const ServerCallContext& context, const FlightInfo& info);

Review Comment:
   This is not tested.
   
   We may want to add `flight_sql:expiration_time:*` integration tests that use `CancelFlightInfo`/`CloseFlightInfo`/`RefreshFlightEndpoint`. But most of codes will be duplicated with `expiration_time:*` integration tests...
   
   @lidavidm What do you think about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235921540


##########
go/arrow/flight/client.go:
##########
@@ -348,10 +352,87 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+// Ensure the result of a DoAction is fully consumed
+func ReadUntilEOF(stream FlightService_DoActionClient) error {

Review Comment:
   Yes. I wanted to use this in here and flightsql.
   
   > Godoc format says that the comment should start with the name of the function.
   
   Oh, sorry. I missed it. I use your suggestion as-is. Thanks!



##########
go/arrow/flight/flightsql/server.go:
##########
@@ -511,8 +511,36 @@ func (BaseServer) BeginSavepoint(context.Context, ActionBeginSavepointRequest) (
 	return nil, status.Error(codes.Unimplemented, "BeginSavepoint not implemented")
 }
 
-func (BaseServer) CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error) {
-	return CancelResultUnspecified, status.Error(codes.Unimplemented, "CancelQuery not implemented")
+func (b *BaseServer) CancelQuery(context context.Context, request ActionCancelQueryRequest) (CancelResult, error) {
+	result, err := b.CancelFlightInfo(context, request.GetInfo())
+	if err != nil {
+		return CancelResultUnspecified, err
+	}

Review Comment:
   Oh, thanks.
   I removed `CancelQuery` because we can simplify our implementation. Is it acceptable from the existing Go users because `CancelQuery` is an experimental API?



##########
go/arrow/flight/flightsql/server.go:
##########
@@ -640,7 +668,17 @@ type Server interface {
 	// EndTransaction commits or rollsback a transaction
 	EndTransaction(context.Context, ActionEndTransactionRequest) error
 	// CancelQuery attempts to explicitly cancel a query
+	// Deprecated: Since 13.0.0. If you can require all clients
+	// use 13.0.0 or later, you can use only CancelFlightInfo and
+	// you don't need to use CancelQuery. Otherwise, you may need
+	// to use CancelQuery and/or CancelFlightInfo.

Review Comment:
   You're right.
   The document is wrong. Users need only implement `CancelFlightInfo` and it works with `CancelQuery` and `CancelFlightInfo` from clients.



##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Ah, sorry. It should have returned one result. I'll fix it.



##########
go/arrow/flight/client.go:
##########
@@ -348,10 +352,87 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+// Ensure the result of a DoAction is fully consumed
+func ReadUntilEOF(stream FlightService_DoActionClient) error {
+	for {
+		_, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		} else if err != nil {
+			return err
+		}
+	}
+}
+
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	action.Body, err = proto.Marshal(info)
+	if err != nil {
+		return
+	}
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	err = ReadUntilEOF(stream)
+	return
+}
+
 func (c *client) Close() error {
 	c.FlightServiceClient = nil
 	if cl, ok := c.conn.(io.Closer); ok {
 		return cl.Close()
 	}
 	return nil
 }
+
+func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
+	var action flight.Action
+	action.Type = CloseFlightInfoActionType
+	action.Body, err = proto.Marshal(info)
+	if err != nil {
+		return
+	}
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	err = ReadUntilEOF(stream)
+	return

Review Comment:
   Thanks!
   I didn't know that we can mix named return value and `return XXX` style.



##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }
+    } else if (action.type == ActionType::kCloseFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        if (!index_result.ok()) {
+          continue;
+        }
+        auto index = *index_result;
+        statuses_[index].closed = true;
+      }
+    } else if (action.type == ActionType::kRefreshFlightEndpoint.type) {
+      ARROW_ASSIGN_OR_RAISE(auto endpoint,
+                            FlightEndpoint::Deserialize(std::string_view(*action.body)));
+      ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(endpoint.ticket.ticket));
+      if (statuses_[index].cancelled) {
+        return Status::Invalid("Invalid flight: canceled: ", endpoint.ticket.ticket);
+      }
+      endpoint.ticket.ticket += ": refreshed (+ 10 seconds)";
+      endpoint.expiration_time = Timestamp::clock::now() + std::chrono::seconds{10};
+      statuses_[index].expiration_time = endpoint.expiration_time.value();
+      ARROW_ASSIGN_OR_RAISE(auto serialized, endpoint.SerializeToString());
+      results.push_back(Result{Buffer::FromString(std::move(serialized))});
+    } else {
+      return Status::Invalid("Unknown action: ", action.type);
+    }
+    *result_stream = std::make_unique<SimpleResultStream>(std::move(results));
+    return Status::OK();
+  }
+
+  Status ListActions(const ServerCallContext& context,
+                     std::vector<ActionType>* actions) override {
+    *actions = {
+        ActionType::kCancelFlightInfo,
+        ActionType::kCloseFlightInfo,
+        ActionType::kRefreshFlightEndpoint,
+    };
+    return Status::OK();
+  }
+
+ private:
+  void AddEndpoint(std::vector<FlightEndpoint>& endpoints, std::string ticket,
+                   std::optional<Timestamp> expiration_time) {
+    endpoints.push_back(FlightEndpoint{
+        {std::to_string(statuses_.size()) + ": " + ticket}, {}, expiration_time});
+    statuses_.emplace_back(expiration_time);
+  }
+
+  arrow::Result<uint32_t> ExtractIndexFromTicket(const std::string& ticket) {
+    auto index_string = arrow::internal::SplitString(ticket, ':', 2)[0];
+    uint32_t index;
+    if (!arrow::internal::ParseUnsigned(index_string.data(), index_string.length(),
+                                        &index)) {
+      return Status::KeyError("Invalid flight: no index: ", ticket);
+    }
+    if (index >= statuses_.size()) {
+      return Status::KeyError("Invalid flight: out of index: ", ticket);
+    }
+    return index;
+  }
+
+  std::shared_ptr<Schema> BuildSchema() {
+    return arrow::schema({arrow::field("number", arrow::uint32(), false)});
+  }
+
+  std::vector<EndpointStatus> statuses_;
+};
+
+/// \brief The expiration time scenario - DoGet.
+///
+/// This tests that the client can read data that isn't expired yet
+/// multiple times and can't read data after it's expired.
+class ExpirationTimeDoGetScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    *server = std::make_unique<ExpirationTimeServer>();
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, client->GetFlightInfo(FlightDescriptor::Command("expiration_time")));
+    std::vector<std::shared_ptr<arrow::Table>> tables;
+    // First read from all endpoints
+    for (const auto& endpoint : info->endpoints()) {
+      ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
+      ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
+      tables.push_back(table);
+    }
+    // Re-reads only from endpoints that have expiration time
+    for (const auto& endpoint : info->endpoints()) {
+      if (endpoint.expiration_time.has_value()) {
+        ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
+        ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
+        tables.push_back(table);
+      } else {
+        auto reader = client->DoGet(endpoint.ticket);
+        if (reader.ok()) {
+          return Status::Invalid(
+              "Data that doesn't have expiration time "
+              "shouldn't be readable multiple times");
+        }
+      }
+    }
+    // Re-reads after expired
+    for (const auto& endpoint : info->endpoints()) {
+      if (!endpoint.expiration_time.has_value()) {
+        continue;
+      }
+      const auto& expiration_time = endpoint.expiration_time.value();
+      if (expiration_time > Timestamp::clock::now()) {
+        std::this_thread::sleep_for(expiration_time - Timestamp::clock::now());
+      }

Review Comment:
   Yes, this might be flaky on very slow one CPU core CI machine.
   But I hope that 2 seconds is enough time to process simple 1 `GetFlightInfo` and (3 (first reads) + 2 (re-reads)) `DoGet()` requests on most machines.
   
   > Maybe there's no need to test the actual expiration of the ticket?
   
   It's an option. How about adding a "We may remove this check if this is flaky in CI" comment to here? And we'll remove this check if this check is failed in our CI.  
   
   > Or, we can treat the expiration as a counter instead of a real expiration time to make things not dependent on the clock.
   
   Hmm. It'll work but it may confuse us when we maintain our integration test. Because expiration time is defined as a timestamp not a counter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1236017273


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235794087


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Actually, the client only ever reads the first result anyways right? So was this meant to only send a single result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1225020575


##########
format/Flight.proto:
##########
@@ -321,6 +348,13 @@ message FlightEndpoint {
    * represent redundant and/or load balanced services.
    */
   repeated Location location = 2;
+
+  /*
+   * Expiration time of this stream. If present, clients may assume
+   * they can retry DoGet requests. Otherwise, clients should avoid
+   * retrying DoGet requests.

Review Comment:
   Thanks!
   Suggestions on wording are always welcome. :-)



##########
dev/archery/archery/integration/runner.py:
##########
@@ -435,6 +435,37 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
             description="Ensure FlightInfo.ordered is supported.",
             skip={"JS", "C#", "Rust"},
         ),
+        Scenario(
+            "expiration_time:do_get",
+            description=("Ensure FlightEndpoint.expiration_time with "
+                         "DoGet is working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:list_actions",
+            description=("Ensure FlightEndpoint.expiration_time related "
+                         "pre-defined actions is working with ListActions "
+                         "as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:cancel_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "CancelFlightInfo are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:refresh_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "RefreshFlightEndpoint are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},
+        ),
+        Scenario(
+            "expiration_time:close_flight_info",
+            description=("Ensure FlightEndpoint.expiration_time and "
+                         "CloseFlightInfo are working as expected."),
+            skip={"Java", "Go", "JS", "C#", "Rust"},

Review Comment:
   Thanks!



##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   Oh, I didn't notice it.
   Is it OK that we move `DrainResultStream()` and `ReadResult()` to `flight/client.h` and use them in both `flight/client.cc` and `flight/sql/client.cc`?



##########
cpp/src/arrow/flight/serialization_internal.cc:
##########
@@ -147,6 +169,15 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint)
   for (const Location& location : endpoint.locations) {
     RETURN_NOT_OK(ToProto(location, pb_endpoint->add_location()));
   }
+  if (endpoint.expiration_time.has_value()) {
+    const auto expiration_time = endpoint.expiration_time.value();
+    const auto since_epoch = expiration_time.time_since_epoch();
+    const auto since_epoch_ns =
+        std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
+    auto pb_expiration_time = pb_endpoint->mutable_expiration_time();
+    pb_expiration_time->set_seconds(since_epoch_ns / 1000000000);

Review Comment:
   Good catch!
   It seems that we can use `std::nano::den`.



##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -324,6 +324,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   Status Rollback(const FlightCallOptions& options, const Savepoint& savepoint);
 
   /// \brief Explicitly cancel a query.
+  /// Deprecated since 13.0.0. Use FlightClient::CancelFlightInfo() instead.

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1590463643

   @zeroshade I've pushed the Go implementation but it's still dirty and remains some TODOs in the Flight SQL part...
   (We need to consider how to store `FlightInfo`/`FlightEndpoint` for `CancelFlightInfo`/`CloseFlightInfo`/`RefreshFlightEndpoint` for the Flight SQL part. See also: https://github.com/apache/arrow/pull/36009#discussion_r1228899238 )
   
   TODO (helps are welcome!):
   * Update document
   * Clean up the C++ implementation
   * Clean up the Go implementation
   * Consider how to keep backward compatibility the Flight SQL API


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1226563773


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   I think `ReadResult` can only be in an `_internal.h` header because it uses Protobuf. But `DrainResultStream` could become `ResultStream::Drain`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1229521567


##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -594,7 +594,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
   virtual Status EndTransaction(const ServerCallContext& context,
                                 const ActionEndTransactionRequest& request);
 
+  /// \brief Attempt to explicitly cancel a FlightInfo.
+  /// \param[in] context  The call context.
+  /// \param[in] info     The FlightInfo to cancel.
+  /// \return             The cancellation result.
+  virtual arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
+      const ServerCallContext& context, const FlightInfo& info);

Review Comment:
   I don't think we need separate integration tests for Flight SQL if we're testing them in Flight RPC. Just unit tests would be sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1233188564


##########
go/arrow/flight/client.go:
##########
@@ -348,6 +352,92 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
+	var action flight.Action
+	action.Type = CloseFlightInfoActionType
+	if action.Body, err = proto.Marshal(info); err != nil {
+		return
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (c *client) RefreshFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error) {
+	var err error
+
+	var action flight.Action
+	action.Type = RefreshFlightEndpointActionType
+	if action.Body, err = proto.Marshal(endpoint); err != nil {
+		return nil, err
+	}
+
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return nil, err
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return nil, err
+	}
+	var refreshedEndpoint FlightEndpoint
+	if err = proto.Unmarshal(res.Body, &refreshedEndpoint); err != nil {
+		return nil, err
+	}
+	for {
+		_, err = stream.Recv()
+		if errors.Is(err, io.EOF) {
+			break
+		}
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &refreshedEndpoint, nil
+}

Review Comment:
   You're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1233451759


##########
go/arrow/flight/flightsql/client_test.go:
##########
@@ -60,6 +60,18 @@ func (m *FlightServiceClientMock) AuthenticateBasicToken(_ context.Context, user
 	return args.Get(0).(context.Context), args.Error(1)
 }
 
+func (m *FlightServiceClientMock) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (CancelFlightInfoResult, error) {
+	panic("not implemented") // TODO: Implement

Review Comment:
   OK. I've changed to use mock and added tests of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231707677


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   > Sorry, what are you proposing here?
   
   I wanted to say that how about the following (or other better name)?
   
   ```diff
   index cda7dae31..5ccdd5b9a 100644
   --- a/format/Flight.proto
   +++ b/format/Flight.proto
   @@ -196,20 +196,20 @@ message Result {
     *
     * This is used by CancelFLightInfoResult.result.
     */
   -enum CancelResult {
   +enum CancelStatus {
      // The cancellation status is unknown. Servers should avoid using
      // this value (send a NOT_FOUND error if the requested query is
      // not known). Clients can retry the request.
   -  CANCEL_RESULT_UNSPECIFIED = 0;
   +  CANCEL_STATUS_UNSPECIFIED = 0;
      // The cancellation request is complete. Subsequent requests with
      // the same payload may return CANCELLED or a NOT_FOUND error.
   -  CANCEL_RESULT_CANCELLED = 1;
   +  CANCEL_STATUS_CANCELLED = 1;
      // The cancellation request is in progress. The client may retry
      // the cancellation request.
   -  CANCEL_RESULT_CANCELLING = 2;
   +  CANCEL_STATUS_CANCELLING = 2;
      // The query is not cancellable. The client should not retry the
      // cancellation request.
   -  CANCEL_RESULT_NOT_CANCELLABLE = 3;
   +  CANCEL_STATUS_NOT_CANCELLABLE = 3;
    }
    
    /*
   @@ -218,7 +218,7 @@ enum CancelResult {
     * The result should be stored in Result.body.
     */
    message CancelFlightInfoResult {
   -  CancelResult result = 1;
   +  CancelStatus status = 1;
    }
    
    /*
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231721304


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   OK!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235528328


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -511,8 +511,36 @@ func (BaseServer) BeginSavepoint(context.Context, ActionBeginSavepointRequest) (
 	return nil, status.Error(codes.Unimplemented, "BeginSavepoint not implemented")
 }
 
-func (BaseServer) CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error) {
-	return CancelResultUnspecified, status.Error(codes.Unimplemented, "CancelQuery not implemented")
+func (b *BaseServer) CancelQuery(context context.Context, request ActionCancelQueryRequest) (CancelResult, error) {
+	result, err := b.CancelFlightInfo(context, request.GetInfo())
+	if err != nil {
+		return CancelResultUnspecified, err
+	}

Review Comment:
   This won't do what you expect. Go embedding doesn't work like C++ polymorphism, this method isn't a virtual call and will *always* call the BaseServer version of `CancelFlightInfo`. A user would need to override `CancelQuery` in order to call their own `CancelFlightInfo`. So it's probably not worthwhile to have this base implementation. 
   
   You should move this implementation to `flightSqlServer` and then call `f.srv.CancelFlightInfo` like we do for the other actions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1599753266

   Thanks!
   I've merged the Java implementation!
   
   (You can push your changes to this branch directly. :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1240372188


##########
format/Flight.proto:
##########
@@ -183,6 +183,15 @@ message Action {
   bytes body = 2;
 }
 
+/*
+ * The request of the RenewFlightEndpoint action.
+ *
+ * The request should be stored in Action.body.
+ */
+message RenewFlightEndpointRequest {

Review Comment:
   Ah, sorry. I'm still changing now. I'll add `CancelFlightInfoRequest` later.
   
   But I didn't have a plan to add `RenewFlightEndpointResult`. Should we add it?
   
   NOTE: The ML message: https://lists.apache.org/thread/ofw6bydhyhcqqn3t8013gx0k6fbw8lvv
   
   > * Remove CloseFlightInfo (if nobody objects it)
   > * RefreshFlightEndpoint ->
   >    RenewFlightEndpoint
   > * RenewFlightEndpoint(FlightEndpoint) ->
   >   RenewFlightEndpoint(RenewFlightEndpointRequest)
   > * CancelFlightInfo(FlightInfo) ->
   >   CancelFlightInfo(CancelFlightInfoRequest)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1226923008


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {

Review Comment:
   At least in Protobuf/gRPC itself, you can only return messages and not bare enums



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231706831


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   > even when wrapped in Any, we can still provide a function/method to parse it right?
   
   Right.
   
   > But I would prefer not to wrap in Any, so we can keep it like this.
   
   OK! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231703671


##########
cpp/src/arrow/flight/client.cc:
##########
@@ -569,6 +569,36 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
   return DoAction(options, action).Value(results);
 }
 
+arrow::Result<std::unique_ptr<ActionCancelFlightInfoResult>>
+FlightClient::CancelFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
+  ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
+  Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
+  ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
+  ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());

Review Comment:
   But I would prefer not to wrap in Any, so we can keep it like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1236027481


##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java:
##########
@@ -383,13 +424,38 @@ default void beginTransaction(ActionBeginTransactionRequest request, CallContext
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
   }
 
+  /**
+   * Explicitly cancel a query.
+   *
+   * @param info    The FlightInfo of the query to cancel.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void cancelFlightInfo(FlightInfo info, CallContext context, StreamListener<CancelStatus> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
+
+  /**
+   * Explicitly free resources associated with a query.
+   *
+   * @param info    The FlightInfo of the query to close.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void closeFlightInfo(FlightInfo info, CallContext context, StreamListener<Result> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
   /**
    * Explicitly cancel a query.
    *
    * @param info     The FlightInfo of the query to cancel.
    * @param context  Per-call context.
    * @param listener Whether cancellation succeeded.
+   * @deprecated Prefer {@link #cancelFlightInfo(FlightInfo, CallContext, StreamListener)}.
    */
+  @Deprecated
   default void cancelQuery(FlightInfo info, CallContext context, StreamListener<CancelResult> listener) {
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());

Review Comment:
   @lidavidm How about providing the default implementation that uses `cancelFlightInfo` so that users don't need to implement both of `cancelQuery()` and `cancelFlightInfo()` to support both of  the `CancelQuery`/`CancelFlightInfo` actions?
   (The C++/Go implementations do it.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1241028705


##########
java/flight/flight-core/src/main/java/org/apache/arrow/flight/RenewFlightEndpointRequest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import org.apache.arrow.flight.impl.Flight;
+
+/** A request to extend the expiration time of a FlightEndpoint. */
+public class RenewFlightEndpointRequest {
+  private final FlightEndpoint endpoint;
+
+  public RenewFlightEndpointRequest(FlightEndpoint endpoint) {
+    this.endpoint = Objects.requireNonNull(endpoint);
+  }
+
+  RenewFlightEndpointRequest(Flight.RenewFlightEndpointRequest proto) throws URISyntaxException {
+    this(new FlightEndpoint(proto.getEndpoint()));
+  }
+
+  public FlightEndpoint getFlightEndpoint() {

Review Comment:
   @lidavidm How about renaming this to `GetEndpoitn()` because `FlightInfo` uses `getEndpoints()` not `getFlightEndpoints()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1600785339

   And this time not all of them failed so it certainly seems like a timing issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1236898026


##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java:
##########
@@ -383,13 +424,38 @@ default void beginTransaction(ActionBeginTransactionRequest request, CallContext
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
   }
 
+  /**
+   * Explicitly cancel a query.
+   *
+   * @param info    The FlightInfo of the query to cancel.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void cancelFlightInfo(FlightInfo info, CallContext context, StreamListener<CancelStatus> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
+
+  /**
+   * Explicitly free resources associated with a query.
+   *
+   * @param info    The FlightInfo of the query to close.
+   * @param context Per-call context.
+   * @param listener An interface for sending data back to the client.
+   */
+  default void closeFlightInfo(FlightInfo info, CallContext context, StreamListener<Result> listener) {
+    listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+  }
+
   /**
    * Explicitly cancel a query.
    *
    * @param info     The FlightInfo of the query to cancel.
    * @param context  Per-call context.
    * @param listener Whether cancellation succeeded.
+   * @deprecated Prefer {@link #cancelFlightInfo(FlightInfo, CallContext, StreamListener)}.
    */
+  @Deprecated
   default void cancelQuery(FlightInfo info, CallContext context, StreamListener<CancelResult> listener) {
     listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235509287


##########
go/arrow/flight/client.go:
##########
@@ -348,10 +352,87 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+// Ensure the result of a DoAction is fully consumed
+func ReadUntilEOF(stream FlightService_DoActionClient) error {

Review Comment:
   do we need this to be exposed and exported? If not, I'd prefer to keep it unexported unless you think it'll be specifically useful for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235610258


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Per-endpoint results may be more flexible, but I'm not sure it's a useful level of flexibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou merged pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou merged PR #36009:
URL: https://github.com/apache/arrow/pull/36009


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1606916033

   https://github.com/apache/arrow/actions/runs/5368022930/jobs/9738596690?pr=36009#step:7:14222
   
   ```text
   Unknown error
   Invalid flight: expired: 1: 2 seconds
   ```
   
   ```text
   FAILED TEST: expiration_time:do_get Java producing,  Java consuming
   ```
   
   Hmm. It seems that the Java implementation takes 2+ seconds for `GetFlightInfo` and `DoGet` on CI. I think that it's too slow even we run it on CI. Anyway, I'll increase expiration time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1239833814


##########
format/Flight.proto:
##########
@@ -183,6 +183,15 @@ message Action {
   bytes body = 2;
 }
 
+/*
+ * The request of the RenewFlightEndpoint action.
+ *
+ * The request should be stored in Action.body.
+ */
+message RenewFlightEndpointRequest {

Review Comment:
   Ah, I saw the ML message just now, ignore me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1584202622

   :warning: GitHub issue #35500 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1590236283

   @zeroshade Thanks for the offer! I have the Go implementation in local and I just need to implement one more integration test to cover all cases implemented in the C++ implementation. I'll be able to complete it and push to this branch today. But the Go implementation is needed to be cleaned up. Please review and/or push improvements to this branch after I push the Go implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1600677355

   Hmm, I wonder if the expirations are because Java runs too slowly in CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1594371643

   > For Flight SQL, we could mark the old Cancel action deprecated in Protobuf and in the APIs?
   
   It makes sense. Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1599985992

   Hmm. Some integration tests failed:
   
   https://github.com/apache/arrow/actions/runs/5328790706/jobs/9653800237?pr=36009#step:7:14620
   
   ```text
    -- Arrow Fatal Error --
   NotImplemented: StatementExecution failed: Flight returned unimplemented error, with message: CancelFlightInfo not implemented
   ```
   
   
   https://github.com/apache/arrow/actions/runs/5328790706/jobs/9653800237?pr=36009#step:7:14592
   
   ```text
   Invalid flight: expired
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235609211


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Hmm, this exposes something a little underspecified - the Flight SQL equivalent always returned a single result. Should we stick to that? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1594404409

   Updated documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1237281980


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -511,8 +511,36 @@ func (BaseServer) BeginSavepoint(context.Context, ActionBeginSavepointRequest) (
 	return nil, status.Error(codes.Unimplemented, "BeginSavepoint not implemented")
 }
 
-func (BaseServer) CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error) {
-	return CancelResultUnspecified, status.Error(codes.Unimplemented, "CancelQuery not implemented")
+func (b *BaseServer) CancelQuery(context context.Context, request ActionCancelQueryRequest) (CancelResult, error) {
+	result, err := b.CancelFlightInfo(context, request.GetInfo())
+	if err != nil {
+		return CancelResultUnspecified, err
+	}

Review Comment:
   So what we could do is check whether or not the server they passed has a `CancelQuery` method, and if it does then we dispatch to that. Otherwise, default to calling `CancelFlightInfo`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1602143533

   Ah, should we remove the following checks too?
   
   We can read multiple times from endpoints that have expiration time:
   
   ```diff
   diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc
   index f253f8db7..b8a75ad6d 100644
   --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
   +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
   @@ -617,26 +617,20 @@ class ExpirationTimeDoGetScenario : public Scenario {
        ARROW_ASSIGN_OR_RAISE(
            auto info, client->GetFlightInfo(FlightDescriptor::Command("expiration_time")));
        std::vector<std::shared_ptr<arrow::Table>> tables;
   -    // First read from all endpoints
        for (const auto& endpoint : info->endpoints()) {
          ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
          ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
   -      tables.push_back(table);
   -    }
   -    // Re-reads only from endpoints that have expiration time
   -    for (const auto& endpoint : info->endpoints()) {
   -      if (endpoint.expiration_time.has_value()) {
   -        ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
   -        ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
   -        tables.push_back(table);
   +      if (tables.size() == 0) {
   +        if (endpoint.expiration_time.has_value()) {
   +          return Status::Invalid("The first endpoint must not have expiration time");
   +        }
          } else {
   -        auto reader = client->DoGet(endpoint.ticket);
   -        if (reader.ok()) {
   -          return Status::Invalid(
   -              "Data that doesn't have expiration time "
   -              "shouldn't be readable multiple times");
   +        if (!endpoint.expiration_time.has_value()) {
   +          return Status::Invalid("The ", tables.size(),
   +                                 "-th endpoint must have expiration time");
            }
          }
   +      tables.push_back(table);
        }
        ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));
    
   @@ -645,13 +639,9 @@ class ExpirationTimeDoGetScenario : public Scenario {
        ARROW_ASSIGN_OR_RAISE(auto builder,
                              RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
        auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
   -    // First reads
        ARROW_RETURN_NOT_OK(number_builder->Append(0));
        ARROW_RETURN_NOT_OK(number_builder->Append(1));
        ARROW_RETURN_NOT_OK(number_builder->Append(2));
   -    // Re-reads only from endpoints that have expiration time
   -    ARROW_RETURN_NOT_OK(number_builder->Append(1));
   -    ARROW_RETURN_NOT_OK(number_builder->Append(2));
        ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
        std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
            expected_record_batch};
   ```
   
   We can read from refreshed endpoints:
   
   ```diff
   diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc
   index f253f8db7..5f212ba2d 100644
   --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
   +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
   @@ -779,13 +779,6 @@ class ExpirationTimeRefreshFlightEndpointScenario : public Scenario {
      Status RunClient(std::unique_ptr<FlightClient> client) override {
        ARROW_ASSIGN_OR_RAISE(auto info,
                              client->GetFlightInfo(FlightDescriptor::Command("expiration")));
   -    std::vector<std::shared_ptr<arrow::Table>> tables;
   -    // First read from all endpoints
   -    for (const auto& endpoint : info->endpoints()) {
   -      ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
   -      ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
   -      tables.push_back(table);
   -    }
        // Refresh all endpoints that have expiration time
        std::vector<FlightEndpoint> refreshed_endpoints;
        Timestamp max_expiration_time;
   @@ -811,56 +804,7 @@ class ExpirationTimeRefreshFlightEndpointScenario : public Scenario {
          }
          refreshed_endpoints.push_back(std::move(refreshed_endpoint));
        }
   -    // Expire all not refreshed endpoints
   -    {
   -      std::vector<Timestamp> refreshed_expiration_times;
   -      for (const auto& endpoint : refreshed_endpoints) {
   -        refreshed_expiration_times.push_back(endpoint.expiration_time.value());
   -      }
   -      std::sort(refreshed_expiration_times.begin(), refreshed_expiration_times.end());
   -      if (refreshed_expiration_times[0] < max_expiration_time) {
   -        return Status::Invalid(
   -            "One or more refreshed expiration time "
   -            "are shorter than original expiration time\n",
   -            "Original:  ", max_expiration_time.time_since_epoch().count(), "\n",
   -            "Refreshed: ", refreshed_expiration_times[0].time_since_epoch().count(),
   -            "\n");
   -      }
   -      if (max_expiration_time > Timestamp::clock::now()) {
   -        std::this_thread::sleep_for(max_expiration_time - Timestamp::clock::now());
   -      }
   -    }
   -    // Re-reads only from refreshed endpoints
   -    for (const auto& endpoint : refreshed_endpoints) {
   -      ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
   -      ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
   -      tables.push_back(table);
   -    }
   -    ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));
    
   -    // Build expected table
   -    auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
   -    ARROW_ASSIGN_OR_RAISE(auto builder,
   -                          RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
   -    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
   -    // First reads
   -    ARROW_RETURN_NOT_OK(number_builder->Append(0));
   -    ARROW_RETURN_NOT_OK(number_builder->Append(1));
   -    ARROW_RETURN_NOT_OK(number_builder->Append(2));
   -    // Re-reads only from refreshed endpoints
   -    ARROW_RETURN_NOT_OK(number_builder->Append(1));
   -    ARROW_RETURN_NOT_OK(number_builder->Append(2));
   -    ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
   -    std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
   -        expected_record_batch};
   -    ARROW_ASSIGN_OR_RAISE(auto expected_table,
   -                          Table::FromRecordBatches(expected_record_batches));
   -
   -    // Check read data
   -    if (!table->Equals(*expected_table)) {
   -      return Status::Invalid("Read data isn't expected\n", "Expected:\n",
   -                             expected_table->ToString(), "Actual:\n", table->ToString());
   -    }
        return Status::OK();
      }
    };
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1602479046

   That sounds good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1238780021


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -927,10 +937,78 @@ func (f *flightSqlServer) ListActions(_ *flight.Empty, stream flight.FlightServi
 	return nil
 }
 
+func cancelStatusToCancelResult(status flight.CancelStatus) CancelResult {
+	switch status {
+	case flight.CancelStatusUnspecified:
+		return CancelResultUnspecified
+	case flight.CancelStatusCancelled:
+		return CancelResultCancelled
+	case flight.CancelStatusCancelling:
+		return CancelResultCancelling
+	case flight.CancelStatusNotCancellable:
+		return CancelResultNotCancellable
+	default:
+		return CancelResultUnspecified
+	}
+}
+
 func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightService_DoActionServer) error {
 	var anycmd anypb.Any
 
 	switch cmd.Type {
+	case flight.CancelFlightInfoActionType:
+		var (
+			info   flight.FlightInfo
+			result flight.CancelFlightInfoResult
+			err    error
+		)
+
+		if err = proto.Unmarshal(cmd.Body, &info); err != nil {
+			return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelFlightInfo: %s", err.Error())
+		}
+
+		if result, err = f.srv.CancelFlightInfo(stream.Context(), &info); err != nil {
+			return err
+		}

Review Comment:
   you can add a check here if `f.srv` implements the `CancelQuery` method to call it instead of `CancelFlightInfo`.
   
   ```go
   type cancelquery interface {
            CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error)
   }
   
   if cancel, ok := f.srv.(cancelquery); ok {
           cancelResult, err := cancel(stream.Context(), ......)
           if err != nil {
                     return err
            }
            result = ....
   } else {
            if result, err = f.srv.CancelFlightInfo(stream.Context(), &info); err != nil {
                    return err
             }
   }
   ```
   
   Or something like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235511108


##########
go/arrow/flight/client.go:
##########
@@ -348,10 +352,87 @@ func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) erro
 	return c.authHandler.Authenticate(ctx, &clientAuthConn{stream})
 }
 
+// Ensure the result of a DoAction is fully consumed
+func ReadUntilEOF(stream FlightService_DoActionClient) error {
+	for {
+		_, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		} else if err != nil {
+			return err
+		}
+	}
+}
+
+func (c *client) CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (result CancelFlightInfoResult, err error) {
+	var action flight.Action
+	action.Type = CancelFlightInfoActionType
+	action.Body, err = proto.Marshal(info)
+	if err != nil {
+		return
+	}
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	res, err := stream.Recv()
+	if err != nil {
+		return
+	}
+	if err = proto.Unmarshal(res.Body, &result); err != nil {
+		return
+	}
+	err = ReadUntilEOF(stream)
+	return
+}
+
 func (c *client) Close() error {
 	c.FlightServiceClient = nil
 	if cl, ok := c.conn.(io.Closer); ok {
 		return cl.Close()
 	}
 	return nil
 }
+
+func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
+	var action flight.Action
+	action.Type = CloseFlightInfoActionType
+	action.Body, err = proto.Marshal(info)
+	if err != nil {
+		return
+	}
+	stream, err := c.DoAction(ctx, &action, opts...)
+	if err != nil {
+		return
+	}
+	err = ReadUntilEOF(stream)
+	return

Review Comment:
   `return ReadUntilEOF(stream)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235528328


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -511,8 +511,36 @@ func (BaseServer) BeginSavepoint(context.Context, ActionBeginSavepointRequest) (
 	return nil, status.Error(codes.Unimplemented, "BeginSavepoint not implemented")
 }
 
-func (BaseServer) CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error) {
-	return CancelResultUnspecified, status.Error(codes.Unimplemented, "CancelQuery not implemented")
+func (b *BaseServer) CancelQuery(context context.Context, request ActionCancelQueryRequest) (CancelResult, error) {
+	result, err := b.CancelFlightInfo(context, request.GetInfo())
+	if err != nil {
+		return CancelResultUnspecified, err
+	}

Review Comment:
   This won't do what you expect. Go embedding doesn't work like C++ polymorphism, this method isn't a virtual call and will *always* call the BaseServer version of `CancelFlightInfo`. A user would need to override `CancelQuery` in order to call their own `CancelFlightInfo`. So it's probably not worthwhile to have this base implementation. 
   
   You should move this implementation to `flightSqlServer` and then call `f.srv.CancelFlightInfo` like we do for the other actions. Probably should also remove the `CancelQuery` from the `Server` interface here so that users ONLY have to implement `CancelFlightInfo`. 
   
   If we're concerned about breaking changes, then we'd need to likely use reflection to check if they have their own `CancelQuery` implementation and call that, otherwise fallback to calling `f.srv.CancelFlightInfo`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1238088931


##########
go/arrow/flight/flightsql/server.go:
##########
@@ -511,8 +511,36 @@ func (BaseServer) BeginSavepoint(context.Context, ActionBeginSavepointRequest) (
 	return nil, status.Error(codes.Unimplemented, "BeginSavepoint not implemented")
 }
 
-func (BaseServer) CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error) {
-	return CancelResultUnspecified, status.Error(codes.Unimplemented, "CancelQuery not implemented")
+func (b *BaseServer) CancelQuery(context context.Context, request ActionCancelQueryRequest) (CancelResult, error) {
+	result, err := b.CancelFlightInfo(context, request.GetInfo())
+	if err != nil {
+		return CancelResultUnspecified, err
+	}

Review Comment:
   You mean that we keep backward compatibility for users who are using `CancelQuery`, right?
   If so, could you push a code for it to this branch? I don't know much about `reflect` package in Go...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1584207779

   It's not completed. See TODO list in the description.
   The C++ implementation only exists for now.
   I'll add the Go implementation and update documents then clean up the current C++ implementation.
   I think that we need to consider how to keep backward compatibility in Flight SQL API.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1226923867


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   Yes, the reason (in case you didn't find it) is that enums in Protobuf use C-style namespacing, which is carried over into generated code. So it will end up as `ActionCancelFlightInfoResult::[CANCEL_RESULT_]UNSPECIFIED`, and so if you ever need a second enum, you need to prefix the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231696974


##########
format/Flight.proto:
##########
@@ -189,6 +191,31 @@ message Result {
   bytes body = 1;
 }
 
+/*
+ * The result of the CancelFlightInfo action.
+ *
+ * The result should be stored in Result.body.
+ */
+message ActionCancelFlightInfoResult {
+  enum CancelResult {
+    // The cancellation status is unknown. Servers should avoid using
+    // this value (send a NOT_FOUND error if the requested query is
+    // not known). Clients can retry the request.
+    CANCEL_RESULT_UNSPECIFIED = 0;

Review Comment:
   @lidavidm ping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1232272534


##########
cpp/src/arrow/flight/types.h:
##########
@@ -630,6 +697,11 @@ class ARROW_FLIGHT_EXPORT ResultStream {
 
   ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.")
   Status Next(std::unique_ptr<Result>* info);
+
+  /// \brief Read and drop the all rest messages to get error from a server.

Review Comment:
   ```suggestion
     /// \brief Read and drop the remaining messages to get the error (if any) from a server.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1233452355


##########
go/arrow/flight/server.go:
##########
@@ -49,9 +49,36 @@ type (
 	Action                          = flight.Action
 	ActionType                      = flight.ActionType
 	Result                          = flight.Result
+	CancelFlightInfoResult    = flight.CancelFlightInfoResult
+	CancelResult = flight.CancelResult

Review Comment:
   Oh, sorry.
   I've ran `go fmt arrow/flight/server.go` in `go/`.
   
   BTW, I got many change when I run `go fmt ./...`. What command do you use for formatting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36009: WIP: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1231980829


##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -594,7 +594,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
   virtual Status EndTransaction(const ServerCallContext& context,
                                 const ActionEndTransactionRequest& request);
 
+  /// \brief Attempt to explicitly cancel a FlightInfo.
+  /// \param[in] context  The call context.
+  /// \param[in] info     The FlightInfo to cancel.
+  /// \return             The cancellation result.
+  virtual arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
+      const ServerCallContext& context, const FlightInfo& info);

Review Comment:
   OK. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36009:
URL: https://github.com/apache/arrow/pull/36009#issuecomment-1617052987

   The vote carried: https://lists.apache.org/thread/no26s310qn3v0n5x830d50k598fh0pvr
   
   I'll merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][Java][FlightRPC] Add support for result set expiration

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1240397313


##########
format/Flight.proto:
##########
@@ -183,6 +183,15 @@ message Action {
   bytes body = 2;
 }
 
+/*
+ * The request of the RenewFlightEndpoint action.
+ *
+ * The request should be stored in Action.body.
+ */
+message RenewFlightEndpointRequest {

Review Comment:
   I think your plan on the ML is fine!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org