You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/04/19 07:55:59 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #35178: GH-34852: [C++][FlightRPC] Add support for ordered data

lidavidm commented on code in PR #35178:
URL: https://github.com/apache/arrow/pull/35178#discussion_r1170955786


##########
format/Flight.proto:
##########
@@ -266,14 +266,28 @@ message FlightInfo {
    * In other words, an application can use multiple endpoints to
    * represent partitioned data.
    *
-   * There is no ordering defined on endpoints. Hence, if the returned
-   * data has an ordering, it should be returned in a single endpoint.
+   * If the returned data has an ordering, an application can use
+   * "FlightInfo.ordered = true" or should return the all data in a
+   * single endpoint.
+   *
+   * Note that a client may ignore "FlightInfo.ordered = true". If an
+   * ordering is important for an application, an application must
+   * choose one of them:
+   *
+   * * An application requires that all clients must read data in
+   *   returned endpoints order.
+   * * An application must return the all data in a single endpoint.

Review Comment:
   Compared to the wording in Flight.rst, it is not as clear what this is trying to say



##########
cpp/src/arrow/flight/sql/example/sqlite_server.cc:
##########
@@ -305,7 +305,7 @@ class SQLiteFlightSqlServer::Impl {
                           EncodeTransactionQuery(query, command.transaction_id));
     std::vector<FlightEndpoint> endpoints{FlightEndpoint{std::move(ticket), {}}};
     ARROW_ASSIGN_OR_RAISE(auto result,
-                          FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
+                          FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

Review Comment:
   SQLite doesn't tell us whether the result set is sorted or not, so I suppose technically this should always be 'true'? (That said, this server also only ever returns one endpoint so it's moot either way.)



##########
docs/source/format/Flight.rst:
##########
@@ -90,9 +90,13 @@ A client that wishes to download the data would:
    An endpoint contains a list of locations (server addresses) where
    this data can be retrieved from, and a ``Ticket``, an opaque binary
    token that the server will use to identify the data being
-   requested. There is no ordering defined on endpoints or the data
-   within, so if the dataset is sorted, applications should return
-   data in a single endpoint.
+   requested. If ``FlightInfo.ordered`` is set, returned endpoints are
+   in the same order as the data. The client can read ordered data by
+   reading data from returned endpoints in order from front to
+   back. Note that a client may ignore ``FlightInfo.ordered``. If an
+   ordering is important and the client may ignore
+   ``FlightInfo.ordered``, applications should return data in a single
+   endpoint.

Review Comment:
   Maybe to be clear, leave in an "Otherwise, there is no ordering defined on endpoints or the data within"?



##########
dev/archery/archery/integration/runner.py:
##########
@@ -430,6 +430,11 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
             "middleware",
             description="Ensure headers are propagated via middleware.",
         ),
+        Scenario(
+            "ordered",
+            description="Ensure FlightInfo.ordered is supported.",
+            skip={"Java", "JS", "C#", "Go", "Rust"},

Review Comment:
   I can put together Java support too.



##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -271,6 +274,148 @@ class MiddlewareScenario : public Scenario {
   std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
 };
 
+/// \brief The server used for testing FlightInfo.ordered.
+///
+/// If the given command is "ordered", the server sets
+/// FlightInfo.ordered. The client that supports FlightInfo.ordered
+/// must read data from endpoints from front to back. The client that
+/// doesn't support FlightInfo.ordered may read data from endpoints in
+/// random order.
+///
+/// This scenario is passed only when the client supports
+/// FlightInfo.ordered.
+class OrderedServer : public FlightServerBase {
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    const auto ordered = (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
+                          descriptor.cmd == "ordered");
+    auto schema = BuildSchema();
+    std::vector<FlightEndpoint> endpoints;
+    if (ordered) {
+      endpoints.push_back(FlightEndpoint{{"1"}, {}});
+      endpoints.push_back(FlightEndpoint{{"2"}, {}});
+      endpoints.push_back(FlightEndpoint{{"3"}, {}});
+    } else {
+      endpoints.push_back(FlightEndpoint{{"1"}, {}});
+      endpoints.push_back(FlightEndpoint{{"3"}, {}});
+      endpoints.push_back(FlightEndpoint{{"2"}, {}});
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered));
+    *result = std::make_unique<FlightInfo>(info);
+    return Status::OK();
+  }
+
+  Status DoGet(const ServerCallContext& context, const Ticket& request,
+               std::unique_ptr<FlightDataStream>* stream) override {
+    ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+                                            BuildSchema(), arrow::default_memory_pool()));
+    auto number_builder = builder->GetFieldAs<Int32Builder>(0);
+    if (request.ticket == "1") {
+      ARROW_RETURN_NOT_OK(number_builder->Append(1));
+      ARROW_RETURN_NOT_OK(number_builder->Append(2));
+      ARROW_RETURN_NOT_OK(number_builder->Append(3));
+    } else if (request.ticket == "2") {
+      ARROW_RETURN_NOT_OK(number_builder->Append(10));
+      ARROW_RETURN_NOT_OK(number_builder->Append(20));
+      ARROW_RETURN_NOT_OK(number_builder->Append(30));
+    } else if (request.ticket == "3") {
+      ARROW_RETURN_NOT_OK(number_builder->Append(100));
+      ARROW_RETURN_NOT_OK(number_builder->Append(200));
+      ARROW_RETURN_NOT_OK(number_builder->Append(300));
+    } else {
+      return Status::KeyError("Could not find flight: ", request.ticket);
+    }
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+    std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+    ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+                          RecordBatchReader::Make(record_batches));
+    *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Schema> BuildSchema() {
+    return arrow::schema({arrow::field("number", arrow::int32())});
+  }
+};
+
+/// \brief The ordered scenario.
+///
+/// This tests that the server and client get expected header values.
+class OrderedScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new OrderedServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    ARROW_ASSIGN_OR_RAISE(auto info,
+                          client->GetFlightInfo(FlightDescriptor::Command("ordered")));
+    const auto& endpoints = info->endpoints();
+    std::vector<FlightEndpoint> ordered_endpoints(endpoints.size());
+    // If the data is ordered, read data from front to back.
+    // If the data is NOT ordered, read data from back to front.
+    if (info->ordered()) {

Review Comment:
   Shouldn't we be asserting that this is true here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org