You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/01 13:24:17 UTC
[arrow] branch master updated: ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 546c3771a2 ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)
546c3771a2 is described below
commit 546c3771a209cbcac5e03cf26e07bcd8c9601d5a
Author: Jayjeet Chakraborty <jc...@rediffmail.com>
AuthorDate: Mon Aug 1 06:24:11 2022 -0700
ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)
Add an example for the `SkyhookFileFormat` extension of Dataset API.
Authored-by: Jayjeet Chakraborty <jc...@rediffmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
cpp/examples/arrow/CMakeLists.txt | 12 ++
cpp/examples/arrow/dataset_skyhook_scan_example.cc | 184 +++++++++++++++++++++
cpp/src/skyhook/CMakeLists.txt | 10 +-
cpp/src/skyhook/protocol/rados_protocol.cc | 5 +-
cpp/src/skyhook/skyhook.pc.in | 2 +-
.../cpp/examples/dataset_skyhook_scan_example.rst | 93 +++++++++++
6 files changed, 299 insertions(+), 7 deletions(-)
diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt
index 0514bf9127..88b760e397 100644
--- a/cpp/examples/arrow/CMakeLists.txt
+++ b/cpp/examples/arrow/CMakeLists.txt
@@ -138,4 +138,16 @@ if(ARROW_PARQUET AND ARROW_DATASET)
add_arrow_example(udf_example)
+ if(ARROW_SKYHOOK)
+ if(ARROW_BUILD_SHARED)
+ list(APPEND DATASET_EXAMPLES_LINK_LIBS arrow_skyhook_shared)
+ else()
+ list(APPEND DATASET_EXAMPLES_LINK_LIBS arrow_skyhook_static)
+ endif()
+
+ add_arrow_example(dataset_skyhook_scan_example EXTRA_LINK_LIBS
+ ${DATASET_EXAMPLES_LINK_LIBS})
+ add_dependencies(dataset-skyhook-scan-example parquet)
+ endif()
+
endif()
diff --git a/cpp/examples/arrow/dataset_skyhook_scan_example.cc b/cpp/examples/arrow/dataset_skyhook_scan_example.cc
new file mode 100644
index 0000000000..2d391723bd
--- /dev/null
+++ b/cpp/examples/arrow/dataset_skyhook_scan_example.cc
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/discovery.h>
+#include <arrow/dataset/file_base.h>
+#include <arrow/dataset/scanner.h>
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/path_util.h>
+#include <skyhook/client/file_skyhook.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using arrow::field;
+using arrow::int16;
+using arrow::Schema;
+using arrow::Table;
+
+namespace fs = arrow::fs;
+
+namespace ds = arrow::dataset;
+
+namespace cp = arrow::compute;
+
+struct Configuration {
+ // Indicates if the Scanner::ToTable should consume in parallel.
+ bool use_threads = true;
+
+ // Indicates to the Scan operator which columns are requested. This
+ // optimization avoid deserializing unneeded columns.
+ std::vector<std::string> projected_columns = {"total_amount"};
+
+ // Indicates the filter by which rows will be filtered. This optimization can
+ // make use of partition information and/or file metadata if possible.
+ cp::Expression filter = cp::greater(cp::field_ref("payment_type"), cp::literal(1));
+
+ ds::InspectOptions inspect_options{};
+ ds::FinishOptions finish_options{};
+} kConf;
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromDirectory(
+ std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+ std::string dir) {
+ // Find all files under `path`
+ fs::FileSelector s;
+ s.base_dir = dir;
+ s.recursive = true;
+
+ // Set partitioning strategy
+ ds::FileSystemFactoryOptions options;
+ options.partitioning = std::make_shared<ds::HivePartitioning>(
+ arrow::schema({arrow::field("payment_type", arrow::int32()),
+ arrow::field("VendorID", arrow::int32())}));
+
+ // The factory will try to build a dataset.
+ ARROW_ASSIGN_OR_RAISE(auto factory,
+ ds::FileSystemDatasetFactory::Make(fs, s, format, options));
+
+ // Try to infer a common schema for all files.
+ ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(kConf.inspect_options));
+ // Caller can optionally decide another schema as long as it is compatible
+ // with the previous one, e.g. `factory->Finish(compatible_schema)`.
+ ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(kConf.finish_options));
+
+ return dataset;
+}
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromFile(
+ std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+ std::string file) {
+ ds::FileSystemFactoryOptions options;
+ // The factory will try to build a dataset.
+ ARROW_ASSIGN_OR_RAISE(auto factory,
+ ds::FileSystemDatasetFactory::Make(fs, {file}, format, options));
+
+ // Try to infer a common schema for all files.
+ ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(kConf.inspect_options));
+ // Caller can optionally decide another schema as long as it is compatible
+ // with the previous one, e.g. `factory->Finish(compatible_schema)`.
+ ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(kConf.finish_options));
+
+ return dataset;
+}
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromPath(
+ std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+ std::string path) {
+ ARROW_ASSIGN_OR_RAISE(auto info, fs->GetFileInfo(path));
+ if (info.IsDirectory()) {
+ return GetDatasetFromDirectory(fs, format, path);
+ }
+ return GetDatasetFromFile(fs, format, path);
+}
+
+arrow::Result<std::shared_ptr<ds::Scanner>> GetScannerFromDataset(
+ std::shared_ptr<ds::Dataset> dataset, std::vector<std::string> columns,
+ cp::Expression filter, bool use_threads) {
+ ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
+
+ if (!columns.empty()) {
+ ARROW_RETURN_NOT_OK(scanner_builder->Project(columns));
+ }
+
+ ARROW_RETURN_NOT_OK(scanner_builder->Filter(filter));
+
+ ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(use_threads));
+
+ return scanner_builder->Finish();
+}
+
+arrow::Result<std::shared_ptr<skyhook::SkyhookFileFormat>> InstantiateSkyhookFormat() {
+ // Path to the Ceph configuration file. It contains cluster wide configuration
+ // and most importantly the connection information to the Ceph cluster.
+ std::string ceph_config_path = "/etc/ceph/ceph.conf";
+
+ // Ceph data pool containing the objects to be scanned.
+ // The default data pool is "cephfs_data".
+ std::string ceph_data_pool = "cephfs_data";
+
+ // The user accessing the Ceph cluster. The default username is "client.admin".
+ std::string ceph_user_name = "client.admin";
+
+ // Cluster name is an unique identifier for a Ceph cluster. It is especially
+ // required when you run multiple Ceph clusters on a multi-site architecture
+ // where the cluster name identifies the Ceph cluster for the
+ // current session. The default cluster name is "ceph".
+ std::string ceph_cluster_name = "ceph";
+
+ // CLS name is used to identify the shared library that needs to be loaded
+ // in the Ceph OSDs when invoking an object class method. For Skyhook, the
+ // library name is "libcls_skyhook.so", and the object class name is "skyhook".
+ std::string ceph_cls_name = "skyhook";
+ std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+ std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+ ceph_user_name, ceph_cluster_name,
+ ceph_cls_name);
+ ARROW_ASSIGN_OR_RAISE(auto format,
+ skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+ return format;
+}
+
+arrow::Status Main(std::string dataset_root) {
+ ARROW_ASSIGN_OR_RAISE(auto format, InstantiateSkyhookFormat());
+ std::string path;
+
+ ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(dataset_root, &path));
+ ARROW_ASSIGN_OR_RAISE(auto dataset, GetDatasetFromPath(fs, format, path));
+ ARROW_ASSIGN_OR_RAISE(
+ auto scanner, GetScannerFromDataset(dataset, kConf.projected_columns, kConf.filter,
+ kConf.use_threads));
+ ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+ std::cout << "Table size: " << table->num_rows() << "\n";
+ return arrow::Status::OK();
+}
+
+int main(int argc, char** argv) {
+ if (argc != 2) {
+ // Fake success for CI purposes.
+ return EXIT_SUCCESS;
+ }
+ auto status = Main(argv[1]);
+ if (!status.ok()) {
+ std::cerr << status.ToString() << std::endl;
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
diff --git a/cpp/src/skyhook/CMakeLists.txt b/cpp/src/skyhook/CMakeLists.txt
index 992c467413..0019251bd6 100644
--- a/cpp/src/skyhook/CMakeLists.txt
+++ b/cpp/src/skyhook/CMakeLists.txt
@@ -19,7 +19,7 @@
add_subdirectory(client)
# define the targets to build
-add_custom_target(arrow_skyhook_client)
+add_custom_target(arrow_skyhook)
add_custom_target(cls_skyhook)
# define the dependencies
@@ -34,7 +34,7 @@ set(ARROW_SKYHOOK_CLS_SOURCES cls/cls_skyhook.cc protocol/rados_protocol.cc
protocol/skyhook_protocol.cc)
# define the client library
-add_arrow_lib(arrow_skyhook_client
+add_arrow_lib(arrow_skyhook
PKG_CONFIG_NAME
skyhook
SOURCES
@@ -58,15 +58,15 @@ add_arrow_lib(cls_skyhook
${ARROW_SKYHOOK_LINK_STATIC})
# finish building the project
-add_dependencies(arrow_skyhook_client ${ARROW_SKYHOOK_CLIENT_LIBRARIES})
+add_dependencies(arrow_skyhook ${ARROW_SKYHOOK_CLIENT_LIBRARIES})
add_dependencies(cls_skyhook ${ARROW_SKYHOOK_CLS_LIBRARIES})
# define the test builds
if(ARROW_TEST_LINKAGE STREQUAL "static")
- set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_static arrow_dataset_static
+ set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_static arrow_dataset_static
${ARROW_TEST_STATIC_LINK_LIBS})
else()
- set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_shared arrow_dataset_shared
+ set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_shared arrow_dataset_shared
${ARROW_TEST_SHARED_LINK_LIBS})
endif()
diff --git a/cpp/src/skyhook/protocol/rados_protocol.cc b/cpp/src/skyhook/protocol/rados_protocol.cc
index cb1acec1fa..eb520787e7 100644
--- a/cpp/src/skyhook/protocol/rados_protocol.cc
+++ b/cpp/src/skyhook/protocol/rados_protocol.cc
@@ -58,7 +58,9 @@ arrow::Status RadosInterface::init2(const char* const name, const char* const cl
arrow::Status RadosInterface::ioctx_create(const char* name, IoCtxInterface* pioctx) {
librados::IoCtx ioCtx;
int ret = cluster->ioctx_create(name, ioCtx);
- pioctx->setIoCtx(&ioCtx);
+ if (!ret) {
+ pioctx->setIoCtx(&ioCtx);
+ }
return GetStatusFromReturnCode(ret, "rados->ioctx_create failed.");
}
@@ -85,6 +87,7 @@ arrow::Status RadosConn::Connect() {
ARROW_RETURN_NOT_OK(rados->conf_read_file(ctx->ceph_config_path.c_str()));
ARROW_RETURN_NOT_OK(rados->connect());
ARROW_RETURN_NOT_OK(rados->ioctx_create(ctx->ceph_data_pool.c_str(), io_ctx.get()));
+ connected = true;
return arrow::Status::OK();
}
diff --git a/cpp/src/skyhook/skyhook.pc.in b/cpp/src/skyhook/skyhook.pc.in
index 8f7acfa979..5568d63c56 100644
--- a/cpp/src/skyhook/skyhook.pc.in
+++ b/cpp/src/skyhook/skyhook.pc.in
@@ -23,4 +23,4 @@ Name: Skyhook
Description: Skyhook is a plugin for offloading computations into Ceph.
Version: @SKYHOOK_VERSION@
Requires: arrow_dataset
-Libs: -L${libdir} -larrow_skyhook_client
+Libs: -L${libdir} -larrow_skyhook
diff --git a/docs/source/cpp/examples/dataset_skyhook_scan_example.rst b/docs/source/cpp/examples/dataset_skyhook_scan_example.rst
new file mode 100644
index 0000000000..75a3954cf3
--- /dev/null
+++ b/docs/source/cpp/examples/dataset_skyhook_scan_example.rst
@@ -0,0 +1,93 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. default-domain:: cpp
+.. highlight:: cpp
+
+=====================
+Arrow Skyhook example
+=====================
+
+The file ``cpp/examples/arrow/dataset_skyhook_scan_example.cc``
+located inside the source tree contains an example of using Skyhook to
+offload filters and projections to a Ceph cluster.
+
+Instuctions
+===========
+
+.. note::
+ The instructions below are for Ubuntu 20.04 or above.
+
+1. Install Ceph and Skyhook dependencies.
+
+ .. code-block:: bash
+
+ apt update
+ apt install -y cmake \
+ libradospp-dev \
+ rados-objclass-dev \
+ ceph \
+ ceph-common \
+ ceph-osd \
+ ceph-mon \
+ ceph-mgr \
+ ceph-mds \
+ rbd-mirror \
+ ceph-fuse \
+ rapidjson-dev \
+ libboost-all-dev \
+ python3-pip
+
+2. Build and install Skyhook.
+
+ .. code-block:: bash
+
+ git clone https://github.com/apache/arrow
+ cd arrow/
+ mkdir -p cpp/release
+ cd cpp/release
+ cmake -DARROW_SKYHOOK=ON \
+ -DARROW_PARQUET=ON \
+ -DARROW_WITH_SNAPPY=ON \
+ -DARROW_BUILD_EXAMPLES=ON \
+ -DARROW_DATASET=ON \
+ -DARROW_CSV=ON \
+ -DARROW_WITH_LZ4=ON \
+ ..
+
+ make -j install
+ cp release/libcls_skyhook.so /usr/lib/x86_64-linux-gnu/rados-classes/
+
+3. Deploy a Ceph cluster with a single in-memory OSD using `this <https://github.com/uccross/skyhookdm/blob/master/scripts/micro-osd.sh>`_ script.
+
+ .. code-block:: bash
+
+ ./micro-osd.sh /tmp/skyhook
+
+4. Generate the example dataset.
+
+ .. code-block:: bash
+
+ pip install pandas pyarrow
+ python3 ../../ci/scripts/generate_dataset.py
+ cp -r nyc /mnt/cephfs/
+
+5. Execute the example.
+
+ .. code-block:: bash
+
+ LD_LIBRARY_PATH=/usr/local/lib release/dataset-skyhook-scan-example file:///mnt/cephfs/nyc