You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yi...@apache.org on 2022/02/10 03:18:11 UTC

[arrow] branch master updated: ARROW-15600: [C++][FlightRPC] Add minimal Flight SQL query example

This is an automated email from the ASF dual-hosted git repository.

yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c0bae8d  ARROW-15600: [C++][FlightRPC] Add minimal Flight SQL query example
c0bae8d is described below

commit c0bae8daea2ace51c64f6db38cfb3d04c5bed657
Author: David Li <li...@gmail.com>
AuthorDate: Thu Feb 10 03:16:15 2022 +0000

    ARROW-15600: [C++][FlightRPC] Add minimal Flight SQL query example
    
    Closes #12354 from lidavidm/arrow-15600
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Yibo Cai <yi...@arm.com>
---
 cpp/examples/arrow/CMakeLists.txt        | 18 +++++++
 cpp/examples/arrow/flight_sql_example.cc | 93 ++++++++++++++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt
index 89e459f..54b7eeb 100644
--- a/cpp/examples/arrow/CMakeLists.txt
+++ b/cpp/examples/arrow/CMakeLists.txt
@@ -84,6 +84,24 @@ if(ARROW_FLIGHT)
                     EXTRA_SOURCES
                     "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc"
                     "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc")
+
+  if(ARROW_FLIGHT_SQL)
+    if(ARROW_GRPC_USE_SHARED)
+      set(FLIGHT_SQL_EXAMPLES_LINK_LIBS arrow_flight_sql_shared)
+    else()
+      set(FLIGHT_SQL_EXAMPLES_LINK_LIBS arrow_flight_sql_static)
+    endif()
+
+    add_arrow_example(flight_sql_example
+                      DEPENDENCIES
+                      flight_sql_test_server
+                      EXTRA_LINK_LIBS
+                      ${FLIGHT_EXAMPLES_LINK_LIBS}
+                      ${FLIGHT_SQL_EXAMPLES_LINK_LIBS}
+                      gRPC::grpc++
+                      ${ARROW_PROTOBUF_LIBPROTOBUF}
+                      ${GFLAGS_LIBRARIES})
+  endif()
 endif()
 
 if(ARROW_PARQUET AND ARROW_DATASET)
diff --git a/cpp/examples/arrow/flight_sql_example.cc b/cpp/examples/arrow/flight_sql_example.cc
new file mode 100644
index 0000000..e8eaaf9
--- /dev/null
+++ b/cpp/examples/arrow/flight_sql_example.cc
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A minimal example of executing a SQL query via Flight SQL.
+//
+// For a corresponding server to test against, this will work with the
+// SQLite example included in the test suite, "flight_sql_test_server".
+// Building this example will also build the server; then, start the
+// server before running this example.
+
+#include <cstdlib>
+#include <iostream>
+
+#include <arrow/flight/client.h>
+#include <arrow/flight/sql/client.h>
+#include <arrow/table.h>
+#include <gflags/gflags.h>
+
+namespace flight = arrow::flight;
+namespace flightsql = arrow::flight::sql;
+
+DEFINE_string(host, "", "The host of the Flight SQL server.");
+DEFINE_int32(port, 31337, "The port of the Flight SQL server.");
+DEFINE_string(query, "SELECT * FROM intTable WHERE value >= 0", "The query to execute.");
+
+arrow::Status Main() {
+  flight::Location location;
+  ARROW_RETURN_NOT_OK(flight::Location::ForGrpcTcp(FLAGS_host, FLAGS_port, &location));
+  std::cout << "Connecting to " << location.ToString() << std::endl;
+
+  // Set up the Flight SQL client
+  std::unique_ptr<flight::FlightClient> flight_client;
+  ARROW_RETURN_NOT_OK(flight::FlightClient::Connect(location, &flight_client));
+  std::unique_ptr<flightsql::FlightSqlClient> client(
+      new flightsql::FlightSqlClient(std::move(flight_client)));
+
+  flight::FlightCallOptions call_options;
+
+  // Execute the query, getting a FlightInfo describing how to fetch the results
+  std::cout << "Executing query: '" << FLAGS_query << "'" << std::endl;
+  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<flight::FlightInfo> flight_info,
+                        client->Execute(call_options, FLAGS_query));
+
+  // Fetch each partition sequentially (though this can be done in parallel)
+  for (const flight::FlightEndpoint& endpoint : flight_info->endpoints()) {
+    // Here we assume each partition is on the same server we originally queried, but this
+    // isn't true in general: the server may split the query results between multiple
+    // other servers, which we would have to connect to.
+
+    // The "ticket" in the endpoint is opaque to the client. The server uses it to
+    // identify which part of the query results to return.
+    ARROW_ASSIGN_OR_RAISE(auto stream, client->DoGet(call_options, endpoint.ticket));
+    // Read all results into an Arrow Table, though we can iteratively process record
+    // batches as they arrive as well
+    std::shared_ptr<arrow::Table> table;
+    ARROW_RETURN_NOT_OK(stream->ReadAll(&table));
+    std::cout << "Read one chunk:" << std::endl;
+    std::cout << table->ToString() << std::endl;
+  }
+
+  return arrow::Status::OK();
+}
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (FLAGS_host.empty()) {
+    // For CI
+    std::cerr << "Must specify the Flight SQL server host with -host" << std::endl;
+    return EXIT_SUCCESS;
+  }
+
+  auto status = Main();
+  if (!status.ok()) {
+    std::cerr << status.ToString() << std::endl;
+    return EXIT_FAILURE;
+  }
+  return EXIT_SUCCESS;
+}