You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/06/20 17:45:06 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #36009: GH-35500: [C++][Go][FlightRPC] Add support for result set expiration

lidavidm commented on code in PR #36009:
URL: https://github.com/apache/arrow/pull/36009#discussion_r1235609211


##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -410,6 +413,470 @@ class OrderedScenario : public Scenario {
   }
 };
 
+/// \brief The server used for testing FlightEndpoint.expiration_time.
+///
+/// GetFlightInfo() returns a FlightInfo that has the following
+/// three FlightEndpoints:
+///
+/// 1. No expiration time
+/// 2. 2 seconds expiration time
+/// 3. 3 seconds expiration time
+///
+/// The client can't read data from the first endpoint multiple times
+/// but can read data from the second and third endpoints. The client
+/// can't re-read data from the second endpoint 2 seconds later. The
+/// client can't re-read data from the third endpoint 3 seconds
+/// later.
+///
+/// The client can cancel a returned FlightInfo by pre-defined
+/// CancelFlightInfo action. The client can't read data from endpoints
+/// even within 3 seconds after the action.
+///
+/// The client can extend the expiration time of a FlightEndpoint in
+/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
+/// action. The client can read data from endpoints multiple times
+/// within more 10 seconds after the action.
+///
+/// The client can close a returned FlightInfo explicitly by
+/// pre-defined CloseFlightInfo action. The client can't read data
+/// from endpoints even within 3 seconds after the action.
+class ExpirationTimeServer : public FlightServerBase {
+ private:
+  struct EndpointStatus {
+    explicit EndpointStatus(std::optional<Timestamp> expiration_time)
+        : expiration_time(expiration_time) {}
+
+    std::optional<Timestamp> expiration_time;
+    uint32_t num_gets = 0;
+    bool cancelled = false;
+    bool closed = false;
+  };
+
+ public:
+  ExpirationTimeServer() : FlightServerBase(), statuses_() {}
+
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    statuses_.clear();
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    AddEndpoint(endpoints, "No expiration time", std::nullopt);
+    AddEndpoint(endpoints, "2 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{2});
+    AddEndpoint(endpoints, "3 seconds",
+                Timestamp::clock::now() + std::chrono::seconds{3});
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
+    auto& status = statuses_[index];
+    if (status.closed) {
+      return Status::KeyError("Invalid flight: closed: ", request.ticket);
+    }
+    if (status.cancelled) {
+      return Status::KeyError("Invalid flight: canceled: ", request.ticket);
+    }
+    if (status.expiration_time.has_value()) {
+      auto expiration_time = status.expiration_time.value();
+      if (expiration_time < Timestamp::clock::now()) {
+        return Status::KeyError("Invalid flight: expired: ", request.ticket);
+      }
+    } else {
+      if (status.num_gets > 0) {
+        return Status::KeyError("Invalid flight: can't read multiple times: ",
+                                request.ticket);
+      }
+    }
+    status.num_gets++;
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
+    ARROW_RETURN_NOT_OK(number_builder->Append(index));
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result_stream) override {
+    std::vector<Result> results;
+    if (action.type == ActionType::kCancelFlightInfo.type) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Deserialize(std::string_view(*action.body)));
+      for (const auto& endpoint : info->endpoints()) {
+        auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
+        auto cancel_status = CancelStatus::kUnspecified;
+        if (index_result.ok()) {
+          auto index = *index_result;
+          if (statuses_[index].cancelled) {
+            cancel_status = CancelStatus::kNotCancellable;
+          } else {
+            statuses_[index].cancelled = true;
+            cancel_status = CancelStatus::kCancelled;
+          }
+        } else {
+          cancel_status = CancelStatus::kNotCancellable;
+        }
+        auto cancel_result = CancelFlightInfoResult{cancel_status};
+        ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
+        results.push_back(Result{Buffer::FromString(std::move(serialized))});
+      }

Review Comment:
   Hmm, this exposes something a little underspecified - the Flight SQL equivalent always returned a single result. Should we stick to that? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org