You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/02/09 16:36:50 UTC
[arrow-cookbook] branch main updated: [C++][Flight] Add basic Flight service (#124)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new 9711e0b [C++][Flight] Add basic Flight service (#124)
9711e0b is described below
commit 9711e0ba626ca1575a767228dced4dfa25eb9b6e
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 9 11:36:44 2022 -0500
[C++][Flight] Add basic Flight service (#124)
---
cpp/code/CMakeLists.txt | 4 +-
cpp/code/common.cc | 2 +-
cpp/code/flight.cc | 294 ++++++++++++++++++++++++++++++++++++++++++++++++
cpp/source/flight.rst | 85 ++++++++++++++
cpp/source/index.rst | 1 +
5 files changed, 384 insertions(+), 2 deletions(-)
diff --git a/cpp/code/CMakeLists.txt b/cpp/code/CMakeLists.txt
index 2113da7..f215d91 100644
--- a/cpp/code/CMakeLists.txt
+++ b/cpp/code/CMakeLists.txt
@@ -7,7 +7,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
endif()
# Add Arrow
-find_package(Arrow REQUIRED COMPONENTS dataset parquet)
+find_package(Arrow REQUIRED COMPONENTS dataset flight parquet)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
@@ -29,6 +29,7 @@ function(RECIPE TARGET)
${TARGET}
arrow_shared
arrow_dataset
+ arrow_flight
parquet
gtest
)
@@ -44,3 +45,4 @@ endfunction()
recipe(basic_arrow)
recipe(creating_arrow_objects)
recipe(datasets)
+recipe(flight)
diff --git a/cpp/code/common.cc b/cpp/code/common.cc
index 4a79907..9a6532b 100644
--- a/cpp/code/common.cc
+++ b/cpp/code/common.cc
@@ -117,7 +117,7 @@ void PopulateMap(const arrow::Table& table,
std::shared_ptr<arrow::StringArray> table_outputs =
std::dynamic_pointer_cast<arrow::StringArray>(table.column(1)->chunk(0));
for (int64_t i = 0; i < table.num_rows(); i++) {
- values->insert({table_names->GetString(i), table_outputs->GetString(i)});
+ (*values)[table_names->GetString(i)] = table_outputs->GetString(i);
}
}
diff --git a/cpp/code/flight.cc b/cpp/code/flight.cc
new file mode 100644
index 0000000..a702184
--- /dev/null
+++ b/cpp/code/flight.cc
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/flight/client.h>
+#include <arrow/flight/server.h>
+#include <arrow/pretty_print.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <arrow/table.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <vector>
+
+#include "common.h"
+
+class ParquetStorageService : public arrow::flight::FlightServerBase {
+ public:
+ const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
+
+ explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
+ : root_(std::move(root)) {}
+
+ arrow::Status ListFlights(const arrow::flight::ServerCallContext&,
+ const arrow::flight::Criteria*,
+ std::unique_ptr<arrow::flight::FlightListing>* listings) {
+ arrow::fs::FileSelector selector;
+ selector.base_dir = "/";
+ ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
+
+ std::vector<arrow::flight::FlightInfo> flights;
+ for (const auto& file_info : listing) {
+ if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
+ ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
+ flights.push_back(std::move(info));
+ }
+
+ *listings = std::unique_ptr<arrow::flight::FlightListing>(
+ new arrow::flight::SimpleFlightListing(std::move(flights)));
+ return arrow::Status::OK();
+ }
+
+ arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
+ const arrow::flight::FlightDescriptor& descriptor,
+ std::unique_ptr<arrow::flight::FlightInfo>* info) {
+ ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
+ ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
+ *info = std::unique_ptr<arrow::flight::FlightInfo>(
+ new arrow::flight::FlightInfo(std::move(flight_info)));
+ return arrow::Status::OK();
+ }
+
+ arrow::Status DoPut(const arrow::flight::ServerCallContext&,
+ std::unique_ptr<arrow::flight::FlightMessageReader> reader,
+ std::unique_ptr<arrow::flight::FlightMetadataWriter>) {
+ ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
+ ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
+ std::shared_ptr<arrow::Table> table;
+ ARROW_RETURN_NOT_OK(reader->ReadAll(&table));
+
+ ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
+ sink, /*chunk_size=*/65536));
+ return arrow::Status::OK();
+ }
+
+ arrow::Status DoGet(const arrow::flight::ServerCallContext&,
+ const arrow::flight::Ticket& request,
+ std::unique_ptr<arrow::flight::FlightDataStream>* stream) {
+ ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
+ arrow::default_memory_pool(), &reader));
+
+ std::shared_ptr<arrow::Table> table;
+ ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+ // Note that we can't directly pass TableBatchReader to
+ // RecordBatchStream because TableBatchReader keeps a non-owning
+ // reference to the underlying Table, which would then get freed
+ // when we exit this function
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ arrow::TableBatchReader batch_reader(*table);
+ ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+
+ ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
+ std::move(batches), table->schema()));
+ *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
+ new arrow::flight::RecordBatchStream(owning_reader));
+
+ return arrow::Status::OK();
+ }
+
+ arrow::Status ListActions(const arrow::flight::ServerCallContext&,
+ std::vector<arrow::flight::ActionType>* actions) override {
+ *actions = {kActionDropDataset};
+ return arrow::Status::OK();
+ }
+
+ arrow::Status DoAction(const arrow::flight::ServerCallContext&,
+ const arrow::flight::Action& action,
+ std::unique_ptr<arrow::flight::ResultStream>* result) {
+ if (action.type == kActionDropDataset.type) {
+ *result = std::unique_ptr<arrow::flight::ResultStream>(
+ new arrow::flight::SimpleResultStream({}));
+ return DoActionDropDataset(action.body->ToString());
+ }
+ return arrow::Status::NotImplemented("Unknown action type: ", action.type);
+ }
+
+ private:
+ arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
+ const arrow::fs::FileInfo& file_info) {
+ ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
+ arrow::default_memory_pool(), &reader));
+
+ std::shared_ptr<arrow::Schema> schema;
+ ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
+
+ auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
+
+ arrow::flight::FlightEndpoint endpoint;
+ endpoint.ticket.ticket = file_info.base_name();
+ arrow::flight::Location location;
+ ARROW_RETURN_NOT_OK(
+ arrow::flight::Location::ForGrpcTcp("localhost", port(), &location));
+ endpoint.locations.push_back(location);
+
+ int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
+ int64_t total_bytes = file_info.size();
+
+ return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
+ total_bytes);
+ }
+
+ arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
+ const arrow::flight::FlightDescriptor& descriptor) {
+ if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
+ return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
+ } else if (descriptor.path.size() != 1) {
+ return arrow::Status::Invalid(
+ "Must provide PATH-type FlightDescriptor with one path component");
+ }
+ return root_->GetFileInfo(descriptor.path[0]);
+ }
+
+ arrow::Status DoActionDropDataset(const std::string& key) {
+ return root_->DeleteFile(key);
+ }
+
+ std::shared_ptr<arrow::fs::FileSystem> root_;
+}; // end ParquetStorageService
+
+arrow::Status TestPutGetDelete() {
+ StartRecipe("ParquetStorageService::StartServer");
+ auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
+ ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
+ ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
+ auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);
+
+ arrow::flight::Location server_location;
+ ARROW_RETURN_NOT_OK(
+ arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &server_location));
+
+ arrow::flight::FlightServerOptions options(server_location);
+ auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
+ new ParquetStorageService(std::move(root)));
+ ARROW_RETURN_NOT_OK(server->Init(options));
+ rout << "Listening on port " << server->port() << std::endl;
+ EndRecipe("ParquetStorageService::StartServer");
+
+ StartRecipe("ParquetStorageService::Connect");
+ arrow::flight::Location location;
+ ARROW_RETURN_NOT_OK(
+ arrow::flight::Location::ForGrpcTcp("localhost", server->port(), &location));
+
+ std::unique_ptr<arrow::flight::FlightClient> client;
+ ARROW_RETURN_NOT_OK(arrow::flight::FlightClient::Connect(location, &client));
+ rout << "Connected to " << location.ToString() << std::endl;
+ EndRecipe("ParquetStorageService::Connect");
+
+ StartRecipe("ParquetStorageService::DoPut");
+ // Open example data file to upload
+ ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
+ FindTestDataFile("airquality.parquet"));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
+ fs->OpenInputFile(airquality_path));
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ ARROW_RETURN_NOT_OK(
+ parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));
+
+ auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
+ std::shared_ptr<arrow::Schema> schema;
+ ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
+
+ // Start the RPC call
+ std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
+ std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
+ ARROW_RETURN_NOT_OK(client->DoPut(descriptor, schema, &writer, &metadata_reader));
+
+ // Upload data
+ std::shared_ptr<arrow::RecordBatchReader> batch_reader;
+ std::vector<int> row_groups(reader->num_row_groups());
+ std::iota(row_groups.begin(), row_groups.end(), 0);
+ ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
+ int64_t batches = 0;
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
+ if (!batch) break;
+ ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+ batches++;
+ }
+
+ ARROW_RETURN_NOT_OK(writer->Close());
+ rout << "Wrote " << batches << " batches" << std::endl;
+ EndRecipe("ParquetStorageService::DoPut");
+
+ StartRecipe("ParquetStorageService::GetFlightInfo");
+ std::unique_ptr<arrow::flight::FlightInfo> flight_info;
+ ARROW_RETURN_NOT_OK(client->GetFlightInfo(descriptor, &flight_info));
+ rout << flight_info->descriptor().ToString() << std::endl;
+ rout << "=== Schema ===" << std::endl;
+ std::shared_ptr<arrow::Schema> info_schema;
+ arrow::ipc::DictionaryMemo dictionary_memo;
+ ARROW_RETURN_NOT_OK(flight_info->GetSchema(&dictionary_memo, &info_schema));
+ rout << info_schema->ToString() << std::endl;
+ rout << "==============" << std::endl;
+ EndRecipe("ParquetStorageService::GetFlightInfo");
+
+ StartRecipe("ParquetStorageService::DoGet");
+ std::unique_ptr<arrow::flight::FlightStreamReader> stream;
+ ARROW_RETURN_NOT_OK(client->DoGet(flight_info->endpoints()[0].ticket, &stream));
+ std::shared_ptr<arrow::Table> table;
+ ARROW_RETURN_NOT_OK(stream->ReadAll(&table));
+ arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
+ arrow::PrettyPrint(*table, print_options, &rout);
+ EndRecipe("ParquetStorageService::DoGet");
+
+ StartRecipe("ParquetStorageService::DoAction");
+ arrow::flight::Action action{"drop_dataset",
+ arrow::Buffer::FromString("airquality.parquet")};
+ std::unique_ptr<arrow::flight::ResultStream> results;
+ ARROW_RETURN_NOT_OK(client->DoAction(action, &results));
+ rout << "Deleted dataset" << std::endl;
+ EndRecipe("ParquetStorageService::DoAction");
+
+ StartRecipe("ParquetStorageService::ListFlights");
+ std::unique_ptr<arrow::flight::FlightListing> listing;
+ ARROW_RETURN_NOT_OK(client->ListFlights(&listing));
+ while (true) {
+ std::unique_ptr<arrow::flight::FlightInfo> flight_info;
+ ARROW_RETURN_NOT_OK(listing->Next(&flight_info));
+ if (!flight_info) break;
+ rout << flight_info->descriptor().ToString() << std::endl;
+ rout << "=== Schema ===" << std::endl;
+ std::shared_ptr<arrow::Schema> info_schema;
+ arrow::ipc::DictionaryMemo dictionary_memo;
+ ARROW_RETURN_NOT_OK(flight_info->GetSchema(&dictionary_memo, &info_schema));
+ rout << info_schema->ToString() << std::endl;
+ rout << "==============" << std::endl;
+ }
+ rout << "End of listing" << std::endl;
+ EndRecipe("ParquetStorageService::ListFlights");
+
+ StartRecipe("ParquetStorageService::StopServer");
+ ARROW_RETURN_NOT_OK(server->Shutdown());
+ rout << "Server shut down successfully" << std::endl;
+ EndRecipe("ParquetStorageService::StopServer");
+ return arrow::Status::OK();
+}
+
+TEST(ParquetStorageServiceTest, PutGetDelete) { ASSERT_OK(TestPutGetDelete()); }
diff --git a/cpp/source/flight.rst b/cpp/source/flight.rst
new file mode 100644
index 0000000..74447ac
--- /dev/null
+++ b/cpp/source/flight.rst
@@ -0,0 +1,85 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============
+Arrow Flight
+============
+
+This section contains a number of recipes for working with Arrow
+Flight, an RPC library specialized for tabular datasets. For more
+about Flight, see :doc:`format/Flight`.
+
+.. contents::
+
+Simple Parquet storage service with Arrow Flight
+================================================
+
+We'll implement a service that provides a key-value store for tabular
+data, using Flight to handle uploads/requests and Parquet to store the
+actual data.
+
+First, we'll implement the service itself. For simplicity, we won't
+use the :doc:`Datasets <./datasets>` API in favor of just using the
+Parquet API directly.
+
+.. literalinclude:: ../code/flight.cc
+ :language: cpp
+ :linenos:
+ :start-at: class ParquetStorageService
+ :end-at: end ParquetStorageService
+ :caption: Parquet storage service, server implementation
+
+First, we'll start our server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::StartServer
+ :dedent: 2
+
+We can then create a client and connect to the server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::Connect
+ :dedent: 2
+
+First, we'll create and upload a table, which will get stored in a
+Parquet file by the server.
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoPut
+ :dedent: 2
+
+Once we do so, we can retrieve the metadata for that dataset:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::GetFlightInfo
+ :dedent: 2
+
+And get the data back:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoGet
+ :dedent: 2
+
+Then, we'll delete the dataset:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::DoAction
+ :dedent: 2
+
+And confirm that it's been deleted:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::ListFlights
+ :dedent: 2
+
+Finally, we'll stop our server:
+
+.. recipe:: ../code/flight.cc ParquetStorageService::StopServer
+ :dedent: 2
diff --git a/cpp/source/index.rst b/cpp/source/index.rst
index dc075ba..35e7bcd 100644
--- a/cpp/source/index.rst
+++ b/cpp/source/index.rst
@@ -30,6 +30,7 @@ serve as robust and well performing solutions to those tasks.
basic
create
datasets
+ flight
Indices and tables
==================