You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/02/09 16:36:50 UTC

[arrow-cookbook] branch main updated: [C++][Flight] Add basic Flight service (#124)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new 9711e0b  [C++][Flight] Add basic Flight service (#124)
9711e0b is described below

commit 9711e0ba626ca1575a767228dced4dfa25eb9b6e
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 9 11:36:44 2022 -0500

    [C++][Flight] Add basic Flight service (#124)
---
 cpp/code/CMakeLists.txt |   4 +-
 cpp/code/common.cc      |   2 +-
 cpp/code/flight.cc      | 294 ++++++++++++++++++++++++++++++++++++++++++++++++
 cpp/source/flight.rst   |  85 ++++++++++++++
 cpp/source/index.rst    |   1 +
 5 files changed, 384 insertions(+), 2 deletions(-)

diff --git a/cpp/code/CMakeLists.txt b/cpp/code/CMakeLists.txt
index 2113da7..f215d91 100644
--- a/cpp/code/CMakeLists.txt
+++ b/cpp/code/CMakeLists.txt
@@ -7,7 +7,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
 endif()
 
 # Add Arrow
-find_package(Arrow REQUIRED COMPONENTS dataset parquet)
+find_package(Arrow REQUIRED COMPONENTS dataset flight parquet)
 
 if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
     set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
@@ -29,6 +29,7 @@ function(RECIPE TARGET)
             ${TARGET}
             arrow_shared
             arrow_dataset
+            arrow_flight
             parquet
             gtest
     )
@@ -44,3 +45,4 @@ endfunction()
 recipe(basic_arrow)
 recipe(creating_arrow_objects)
 recipe(datasets)
+recipe(flight)
diff --git a/cpp/code/common.cc b/cpp/code/common.cc
index 4a79907..9a6532b 100644
--- a/cpp/code/common.cc
+++ b/cpp/code/common.cc
@@ -117,7 +117,7 @@ void PopulateMap(const arrow::Table& table,
   std::shared_ptr<arrow::StringArray> table_outputs =
       std::dynamic_pointer_cast<arrow::StringArray>(table.column(1)->chunk(0));
   for (int64_t i = 0; i < table.num_rows(); i++) {
-    values->insert({table_names->GetString(i), table_outputs->GetString(i)});
+    (*values)[table_names->GetString(i)] = table_outputs->GetString(i);
   }
 }
 
diff --git a/cpp/code/flight.cc b/cpp/code/flight.cc
new file mode 100644
index 0000000..a702184
--- /dev/null
+++ b/cpp/code/flight.cc
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/flight/client.h>
+#include <arrow/flight/server.h>
+#include <arrow/pretty_print.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <arrow/table.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <vector>
+
+#include "common.h"
+
+class ParquetStorageService : public arrow::flight::FlightServerBase {
+ public:
+  const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
+
+  explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
+      : root_(std::move(root)) {}
+
+  arrow::Status ListFlights(const arrow::flight::ServerCallContext&,
+                            const arrow::flight::Criteria*,
+                            std::unique_ptr<arrow::flight::FlightListing>* listings) {
+    arrow::fs::FileSelector selector;
+    selector.base_dir = "/";
+    ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
+
+    std::vector<arrow::flight::FlightInfo> flights;
+    for (const auto& file_info : listing) {
+      if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
+      ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
+      flights.push_back(std::move(info));
+    }
+
+    *listings = std::unique_ptr<arrow::flight::FlightListing>(
+        new arrow::flight::SimpleFlightListing(std::move(flights)));
+    return arrow::Status::OK();
+  }
+
+  arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
+                              const arrow::flight::FlightDescriptor& descriptor,
+                              std::unique_ptr<arrow::flight::FlightInfo>* info) {
+    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
+    ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
+    *info = std::unique_ptr<arrow::flight::FlightInfo>(
+        new arrow::flight::FlightInfo(std::move(flight_info)));
+    return arrow::Status::OK();
+  }
+
+  arrow::Status DoPut(const arrow::flight::ServerCallContext&,
+                      std::unique_ptr<arrow::flight::FlightMessageReader> reader,
+                      std::unique_ptr<arrow::flight::FlightMetadataWriter>) {
+    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
+    ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
+    std::shared_ptr<arrow::Table> table;
+    ARROW_RETURN_NOT_OK(reader->ReadAll(&table));
+
+    ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
+                                                   sink, /*chunk_size=*/65536));
+    return arrow::Status::OK();
+  }
+
+  arrow::Status DoGet(const arrow::flight::ServerCallContext&,
+                      const arrow::flight::Ticket& request,
+                      std::unique_ptr<arrow::flight::FlightDataStream>* stream) {
+    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
+    std::unique_ptr<parquet::arrow::FileReader> reader;
+    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
+                                                 arrow::default_memory_pool(), &reader));
+
+    std::shared_ptr<arrow::Table> table;
+    ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+    // Note that we can't directly pass TableBatchReader to
+    // RecordBatchStream because TableBatchReader keeps a non-owning
+    // reference to the underlying Table, which would then get freed
+    // when we exit this function
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+    arrow::TableBatchReader batch_reader(*table);
+    ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+
+    ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
+                                                  std::move(batches), table->schema()));
+    *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
+        new arrow::flight::RecordBatchStream(owning_reader));
+
+    return arrow::Status::OK();
+  }
+
+  arrow::Status ListActions(const arrow::flight::ServerCallContext&,
+                            std::vector<arrow::flight::ActionType>* actions) override {
+    *actions = {kActionDropDataset};
+    return arrow::Status::OK();
+  }
+
+  arrow::Status DoAction(const arrow::flight::ServerCallContext&,
+                         const arrow::flight::Action& action,
+                         std::unique_ptr<arrow::flight::ResultStream>* result) {
+    if (action.type == kActionDropDataset.type) {
+      *result = std::unique_ptr<arrow::flight::ResultStream>(
+          new arrow::flight::SimpleResultStream({}));
+      return DoActionDropDataset(action.body->ToString());
+    }
+    return arrow::Status::NotImplemented("Unknown action type: ", action.type);
+  }
+
+ private:
+  arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
+      const arrow::fs::FileInfo& file_info) {
+    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
+    std::unique_ptr<parquet::arrow::FileReader> reader;
+    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
+                                                 arrow::default_memory_pool(), &reader));
+
+    std::shared_ptr<arrow::Schema> schema;
+    ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
+
+    auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
+
+    arrow::flight::FlightEndpoint endpoint;
+    endpoint.ticket.ticket = file_info.base_name();
+    arrow::flight::Location location;
+    ARROW_RETURN_NOT_OK(
+        arrow::flight::Location::ForGrpcTcp("localhost", port(), &location));
+    endpoint.locations.push_back(location);
+
+    int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
+    int64_t total_bytes = file_info.size();
+
+    return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
+                                           total_bytes);
+  }
+
+  arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
+      const arrow::flight::FlightDescriptor& descriptor) {
+    if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
+      return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
+    } else if (descriptor.path.size() != 1) {
+      return arrow::Status::Invalid(
+          "Must provide PATH-type FlightDescriptor with one path component");
+    }
+    return root_->GetFileInfo(descriptor.path[0]);
+  }
+
+  arrow::Status DoActionDropDataset(const std::string& key) {
+    return root_->DeleteFile(key);
+  }
+
+  std::shared_ptr<arrow::fs::FileSystem> root_;
+};  // end ParquetStorageService
+
+arrow::Status TestPutGetDelete() {
+  StartRecipe("ParquetStorageService::StartServer");
+  auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
+  ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
+  ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
+  auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);
+
+  arrow::flight::Location server_location;
+  ARROW_RETURN_NOT_OK(
+      arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &server_location));
+
+  arrow::flight::FlightServerOptions options(server_location);
+  auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
+      new ParquetStorageService(std::move(root)));
+  ARROW_RETURN_NOT_OK(server->Init(options));
+  rout << "Listening on port " << server->port() << std::endl;
+  EndRecipe("ParquetStorageService::StartServer");
+
+  StartRecipe("ParquetStorageService::Connect");
+  arrow::flight::Location location;
+  ARROW_RETURN_NOT_OK(
+      arrow::flight::Location::ForGrpcTcp("localhost", server->port(), &location));
+
+  std::unique_ptr<arrow::flight::FlightClient> client;
+  ARROW_RETURN_NOT_OK(arrow::flight::FlightClient::Connect(location, &client));
+  rout << "Connected to " << location.ToString() << std::endl;
+  EndRecipe("ParquetStorageService::Connect");
+
+  StartRecipe("ParquetStorageService::DoPut");
+  // Open example data file to upload
+  ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
+                        FindTestDataFile("airquality.parquet"));
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
+                        fs->OpenInputFile(airquality_path));
+  std::unique_ptr<parquet::arrow::FileReader> reader;
+  ARROW_RETURN_NOT_OK(
+      parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));
+
+  auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
+  std::shared_ptr<arrow::Schema> schema;
+  ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
+
+  // Start the RPC call
+  std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
+  std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
+  ARROW_RETURN_NOT_OK(client->DoPut(descriptor, schema, &writer, &metadata_reader));
+
+  // Upload data
+  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
+  std::vector<int> row_groups(reader->num_row_groups());
+  std::iota(row_groups.begin(), row_groups.end(), 0);
+  ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
+  int64_t batches = 0;
+  while (true) {
+    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
+    if (!batch) break;
+    ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+    batches++;
+  }
+
+  ARROW_RETURN_NOT_OK(writer->Close());
+  rout << "Wrote " << batches << " batches" << std::endl;
+  EndRecipe("ParquetStorageService::DoPut");
+
+  StartRecipe("ParquetStorageService::GetFlightInfo");
+  std::unique_ptr<arrow::flight::FlightInfo> flight_info;
+  ARROW_RETURN_NOT_OK(client->GetFlightInfo(descriptor, &flight_info));
+  rout << flight_info->descriptor().ToString() << std::endl;
+  rout << "=== Schema ===" << std::endl;
+  std::shared_ptr<arrow::Schema> info_schema;
+  arrow::ipc::DictionaryMemo dictionary_memo;
+  ARROW_RETURN_NOT_OK(flight_info->GetSchema(&dictionary_memo, &info_schema));
+  rout << info_schema->ToString() << std::endl;
+  rout << "==============" << std::endl;
+  EndRecipe("ParquetStorageService::GetFlightInfo");
+
+  StartRecipe("ParquetStorageService::DoGet");
+  std::unique_ptr<arrow::flight::FlightStreamReader> stream;
+  ARROW_RETURN_NOT_OK(client->DoGet(flight_info->endpoints()[0].ticket, &stream));
+  std::shared_ptr<arrow::Table> table;
+  ARROW_RETURN_NOT_OK(stream->ReadAll(&table));
+  arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
+  arrow::PrettyPrint(*table, print_options, &rout);
+  EndRecipe("ParquetStorageService::DoGet");
+
+  StartRecipe("ParquetStorageService::DoAction");
+  arrow::flight::Action action{"drop_dataset",
+                               arrow::Buffer::FromString("airquality.parquet")};
+  std::unique_ptr<arrow::flight::ResultStream> results;
+  ARROW_RETURN_NOT_OK(client->DoAction(action, &results));
+  rout << "Deleted dataset" << std::endl;
+  EndRecipe("ParquetStorageService::DoAction");
+
+  StartRecipe("ParquetStorageService::ListFlights");
+  std::unique_ptr<arrow::flight::FlightListing> listing;
+  ARROW_RETURN_NOT_OK(client->ListFlights(&listing));
+  while (true) {
+    std::unique_ptr<arrow::flight::FlightInfo> flight_info;
+    ARROW_RETURN_NOT_OK(listing->Next(&flight_info));
+    if (!flight_info) break;
+    rout << flight_info->descriptor().ToString() << std::endl;
+    rout << "=== Schema ===" << std::endl;
+    std::shared_ptr<arrow::Schema> info_schema;
+    arrow::ipc::DictionaryMemo dictionary_memo;
+    ARROW_RETURN_NOT_OK(flight_info->GetSchema(&dictionary_memo, &info_schema));
+    rout << info_schema->ToString() << std::endl;
+    rout << "==============" << std::endl;
+  }
+  rout << "End of listing" << std::endl;
+  EndRecipe("ParquetStorageService::ListFlights");
+
+  StartRecipe("ParquetStorageService::StopServer");
+  ARROW_RETURN_NOT_OK(server->Shutdown());
+  rout << "Server shut down successfully" << std::endl;
+  EndRecipe("ParquetStorageService::StopServer");
+  return arrow::Status::OK();
+}
+
+TEST(ParquetStorageServiceTest, PutGetDelete) { ASSERT_OK(TestPutGetDelete()); }
diff --git a/cpp/source/flight.rst b/cpp/source/flight.rst
new file mode 100644
index 0000000..74447ac
--- /dev/null
+++ b/cpp/source/flight.rst
@@ -0,0 +1,85 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow
+Flight, an RPC library specialized for tabular datasets. For more
+about Flight, see :doc:`format/Flight`.
+
+.. contents::
+
+Simple Parquet storage service with Arrow Flight
+================================================
+
+We'll implement a service that provides a key-value store for tabular
+data, using Flight to handle uploads/requests and Parquet to store the
+actual data.
+
+First, we'll implement the service itself. For simplicity, we won't
+use the :doc:`Datasets <./datasets>` API in favor of just using the
+Parquet API directly.
+
+.. literalinclude:: ../code/flight.cc
+   :language: cpp
+   :linenos:
+   :start-at: class ParquetStorageService
+   :end-at: end ParquetStorageService
+   :caption: Parquet storage service, server implementation
+
+First, we'll start our server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::StartServer
+   :dedent: 2
+
+We can then create a client and connect to the server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::Connect
+   :dedent: 2
+
+First, we'll create and upload a table, which will get stored in a
+Parquet file by the server.
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoPut
+   :dedent: 2
+
+Once we do so, we can retrieve the metadata for that dataset:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::GetFlightInfo
+   :dedent: 2
+
+And get the data back:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoGet
+   :dedent: 2
+
+Then, we'll delete the dataset:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoAction
+   :dedent: 2
+
+And confirm that it's been deleted:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::ListFlights
+   :dedent: 2
+
+Finally, we'll stop our server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::StopServer
+   :dedent: 2
diff --git a/cpp/source/index.rst b/cpp/source/index.rst
index dc075ba..35e7bcd 100644
--- a/cpp/source/index.rst
+++ b/cpp/source/index.rst
@@ -30,6 +30,7 @@ serve as robust and well performing solutions to those tasks.
    basic
    create
    datasets
+   flight
 
 Indices and tables
 ==================