You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/01 13:24:17 UTC

[arrow] branch master updated: ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 546c3771a2 ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)
546c3771a2 is described below

commit 546c3771a209cbcac5e03cf26e07bcd8c9601d5a
Author: Jayjeet Chakraborty <jc...@rediffmail.com>
AuthorDate: Mon Aug 1 06:24:11 2022 -0700

    ARROW-15927: [C++][Skyhook] Add skyhook example (#12620)
    
    Add an example for the `SkyhookFileFormat` extension of Dataset API.
    
    Authored-by: Jayjeet Chakraborty <jc...@rediffmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/examples/arrow/CMakeLists.txt                  |  12 ++
 cpp/examples/arrow/dataset_skyhook_scan_example.cc | 184 +++++++++++++++++++++
 cpp/src/skyhook/CMakeLists.txt                     |  10 +-
 cpp/src/skyhook/protocol/rados_protocol.cc         |   5 +-
 cpp/src/skyhook/skyhook.pc.in                      |   2 +-
 .../cpp/examples/dataset_skyhook_scan_example.rst  |  93 +++++++++++
 6 files changed, 299 insertions(+), 7 deletions(-)

diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt
index 0514bf9127..88b760e397 100644
--- a/cpp/examples/arrow/CMakeLists.txt
+++ b/cpp/examples/arrow/CMakeLists.txt
@@ -138,4 +138,16 @@ if(ARROW_PARQUET AND ARROW_DATASET)
 
   add_arrow_example(udf_example)
 
+  if(ARROW_SKYHOOK)
+    if(ARROW_BUILD_SHARED)
+      list(APPEND DATASET_EXAMPLES_LINK_LIBS arrow_skyhook_shared)
+    else()
+      list(APPEND DATASET_EXAMPLES_LINK_LIBS arrow_skyhook_static)
+    endif()
+
+    add_arrow_example(dataset_skyhook_scan_example EXTRA_LINK_LIBS
+                      ${DATASET_EXAMPLES_LINK_LIBS})
+    add_dependencies(dataset-skyhook-scan-example parquet)
+  endif()
+
 endif()
diff --git a/cpp/examples/arrow/dataset_skyhook_scan_example.cc b/cpp/examples/arrow/dataset_skyhook_scan_example.cc
new file mode 100644
index 0000000000..2d391723bd
--- /dev/null
+++ b/cpp/examples/arrow/dataset_skyhook_scan_example.cc
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/discovery.h>
+#include <arrow/dataset/file_base.h>
+#include <arrow/dataset/scanner.h>
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/path_util.h>
+#include <skyhook/client/file_skyhook.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using arrow::field;
+using arrow::int16;
+using arrow::Schema;
+using arrow::Table;
+
+namespace fs = arrow::fs;
+
+namespace ds = arrow::dataset;
+
+namespace cp = arrow::compute;
+
+struct Configuration {
+  // Indicates if the Scanner::ToTable should consume in parallel.
+  bool use_threads = true;
+
+  // Indicates to the Scan operator which columns are requested. This
+  // optimization avoid deserializing unneeded columns.
+  std::vector<std::string> projected_columns = {"total_amount"};
+
+  // Indicates the filter by which rows will be filtered. This optimization can
+  // make use of partition information and/or file metadata if possible.
+  cp::Expression filter = cp::greater(cp::field_ref("payment_type"), cp::literal(1));
+
+  ds::InspectOptions inspect_options{};
+  ds::FinishOptions finish_options{};
+} kConf;
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromDirectory(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+    std::string dir) {
+  // Find all files under `path`
+  fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  // Set partitioning strategy
+  ds::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<ds::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+
+  // The factory will try to build a dataset.
+  ARROW_ASSIGN_OR_RAISE(auto factory,
+                        ds::FileSystemDatasetFactory::Make(fs, s, format, options));
+
+  // Try to infer a common schema for all files.
+  ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(kConf.inspect_options));
+  // Caller can optionally decide another schema as long as it is compatible
+  // with the previous one, e.g. `factory->Finish(compatible_schema)`.
+  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(kConf.finish_options));
+
+  return dataset;
+}
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromFile(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+    std::string file) {
+  ds::FileSystemFactoryOptions options;
+  // The factory will try to build a dataset.
+  ARROW_ASSIGN_OR_RAISE(auto factory,
+                        ds::FileSystemDatasetFactory::Make(fs, {file}, format, options));
+
+  // Try to infer a common schema for all files.
+  ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(kConf.inspect_options));
+  // Caller can optionally decide another schema as long as it is compatible
+  // with the previous one, e.g. `factory->Finish(compatible_schema)`.
+  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(kConf.finish_options));
+
+  return dataset;
+}
+
+arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromPath(
+    std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::FileFormat> format,
+    std::string path) {
+  ARROW_ASSIGN_OR_RAISE(auto info, fs->GetFileInfo(path));
+  if (info.IsDirectory()) {
+    return GetDatasetFromDirectory(fs, format, path);
+  }
+  return GetDatasetFromFile(fs, format, path);
+}
+
+arrow::Result<std::shared_ptr<ds::Scanner>> GetScannerFromDataset(
+    std::shared_ptr<ds::Dataset> dataset, std::vector<std::string> columns,
+    cp::Expression filter, bool use_threads) {
+  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
+
+  if (!columns.empty()) {
+    ARROW_RETURN_NOT_OK(scanner_builder->Project(columns));
+  }
+
+  ARROW_RETURN_NOT_OK(scanner_builder->Filter(filter));
+
+  ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(use_threads));
+
+  return scanner_builder->Finish();
+}
+
+arrow::Result<std::shared_ptr<skyhook::SkyhookFileFormat>> InstantiateSkyhookFormat() {
+  // Path to the Ceph configuration file. It contains cluster wide configuration
+  // and most importantly the connection information to the Ceph cluster.
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+
+  // Ceph data pool containing the objects to be scanned.
+  // The default data pool is "cephfs_data".
+  std::string ceph_data_pool = "cephfs_data";
+
+  // The user accessing the Ceph cluster. The default username is "client.admin".
+  std::string ceph_user_name = "client.admin";
+
+  // Cluster name is an unique identifier for a Ceph cluster. It is especially
+  // required when you run multiple Ceph clusters on a multi-site architecture
+  // where the cluster name identifies the Ceph cluster for the
+  // current session. The default cluster name is "ceph".
+  std::string ceph_cluster_name = "ceph";
+
+  // CLS name is used to identify the shared library that needs to be loaded
+  // in the Ceph OSDs when invoking an object class method. For Skyhook, the
+  // library name is "libcls_skyhook.so", and the object class name is "skyhook".
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  ARROW_ASSIGN_OR_RAISE(auto format,
+                        skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
+  return format;
+}
+
+arrow::Status Main(std::string dataset_root) {
+  ARROW_ASSIGN_OR_RAISE(auto format, InstantiateSkyhookFormat());
+  std::string path;
+
+  ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(dataset_root, &path));
+  ARROW_ASSIGN_OR_RAISE(auto dataset, GetDatasetFromPath(fs, format, path));
+  ARROW_ASSIGN_OR_RAISE(
+      auto scanner, GetScannerFromDataset(dataset, kConf.projected_columns, kConf.filter,
+                                          kConf.use_threads));
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  std::cout << "Table size: " << table->num_rows() << "\n";
+  return arrow::Status::OK();
+}
+
+int main(int argc, char** argv) {
+  if (argc != 2) {
+    // Fake success for CI purposes.
+    return EXIT_SUCCESS;
+  }
+  auto status = Main(argv[1]);
+  if (!status.ok()) {
+    std::cerr << status.ToString() << std::endl;
+    return EXIT_FAILURE;
+  }
+  return EXIT_SUCCESS;
+}
diff --git a/cpp/src/skyhook/CMakeLists.txt b/cpp/src/skyhook/CMakeLists.txt
index 992c467413..0019251bd6 100644
--- a/cpp/src/skyhook/CMakeLists.txt
+++ b/cpp/src/skyhook/CMakeLists.txt
@@ -19,7 +19,7 @@
 add_subdirectory(client)
 
 # define the targets to build
-add_custom_target(arrow_skyhook_client)
+add_custom_target(arrow_skyhook)
 add_custom_target(cls_skyhook)
 
 # define the dependencies
@@ -34,7 +34,7 @@ set(ARROW_SKYHOOK_CLS_SOURCES cls/cls_skyhook.cc protocol/rados_protocol.cc
                               protocol/skyhook_protocol.cc)
 
 # define the client library
-add_arrow_lib(arrow_skyhook_client
+add_arrow_lib(arrow_skyhook
               PKG_CONFIG_NAME
               skyhook
               SOURCES
@@ -58,15 +58,15 @@ add_arrow_lib(cls_skyhook
               ${ARROW_SKYHOOK_LINK_STATIC})
 
 # finish building the project
-add_dependencies(arrow_skyhook_client ${ARROW_SKYHOOK_CLIENT_LIBRARIES})
+add_dependencies(arrow_skyhook ${ARROW_SKYHOOK_CLIENT_LIBRARIES})
 add_dependencies(cls_skyhook ${ARROW_SKYHOOK_CLS_LIBRARIES})
 
 # define the test builds
 if(ARROW_TEST_LINKAGE STREQUAL "static")
-  set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_static arrow_dataset_static
+  set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_static arrow_dataset_static
                                    ${ARROW_TEST_STATIC_LINK_LIBS})
 else()
-  set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_shared arrow_dataset_shared
+  set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_shared arrow_dataset_shared
                                    ${ARROW_TEST_SHARED_LINK_LIBS})
 endif()
 
diff --git a/cpp/src/skyhook/protocol/rados_protocol.cc b/cpp/src/skyhook/protocol/rados_protocol.cc
index cb1acec1fa..eb520787e7 100644
--- a/cpp/src/skyhook/protocol/rados_protocol.cc
+++ b/cpp/src/skyhook/protocol/rados_protocol.cc
@@ -58,7 +58,9 @@ arrow::Status RadosInterface::init2(const char* const name, const char* const cl
 arrow::Status RadosInterface::ioctx_create(const char* name, IoCtxInterface* pioctx) {
   librados::IoCtx ioCtx;
   int ret = cluster->ioctx_create(name, ioCtx);
-  pioctx->setIoCtx(&ioCtx);
+  if (!ret) {
+    pioctx->setIoCtx(&ioCtx);
+  }
   return GetStatusFromReturnCode(ret, "rados->ioctx_create failed.");
 }
 
@@ -85,6 +87,7 @@ arrow::Status RadosConn::Connect() {
   ARROW_RETURN_NOT_OK(rados->conf_read_file(ctx->ceph_config_path.c_str()));
   ARROW_RETURN_NOT_OK(rados->connect());
   ARROW_RETURN_NOT_OK(rados->ioctx_create(ctx->ceph_data_pool.c_str(), io_ctx.get()));
+  connected = true;
   return arrow::Status::OK();
 }
 
diff --git a/cpp/src/skyhook/skyhook.pc.in b/cpp/src/skyhook/skyhook.pc.in
index 8f7acfa979..5568d63c56 100644
--- a/cpp/src/skyhook/skyhook.pc.in
+++ b/cpp/src/skyhook/skyhook.pc.in
@@ -23,4 +23,4 @@ Name: Skyhook
 Description: Skyhook is a plugin for offloading computations into Ceph.
 Version: @SKYHOOK_VERSION@
 Requires: arrow_dataset
-Libs: -L${libdir} -larrow_skyhook_client
+Libs: -L${libdir} -larrow_skyhook
diff --git a/docs/source/cpp/examples/dataset_skyhook_scan_example.rst b/docs/source/cpp/examples/dataset_skyhook_scan_example.rst
new file mode 100644
index 0000000000..75a3954cf3
--- /dev/null
+++ b/docs/source/cpp/examples/dataset_skyhook_scan_example.rst
@@ -0,0 +1,93 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. default-domain:: cpp
+.. highlight:: cpp
+
+=====================
+Arrow Skyhook example
+=====================
+
+The file ``cpp/examples/arrow/dataset_skyhook_scan_example.cc``
+located inside the source tree contains an example of using Skyhook to
+offload filters and projections to a Ceph cluster.
+
+Instuctions
+===========
+
+.. note::
+   The instructions below are for Ubuntu 20.04 or above.
+
+1. Install Ceph and Skyhook dependencies.
+
+   .. code-block:: bash
+
+      apt update
+      apt install -y cmake \
+                     libradospp-dev \
+                     rados-objclass-dev \
+                     ceph \
+                     ceph-common \
+                     ceph-osd \
+                     ceph-mon \
+                     ceph-mgr \
+                     ceph-mds \
+                     rbd-mirror \
+                     ceph-fuse \
+                     rapidjson-dev \
+                     libboost-all-dev \
+                     python3-pip
+
+2. Build and install Skyhook.
+
+   .. code-block:: bash
+
+      git clone https://github.com/apache/arrow
+      cd arrow/
+      mkdir -p cpp/release
+      cd cpp/release
+      cmake -DARROW_SKYHOOK=ON \
+            -DARROW_PARQUET=ON \
+            -DARROW_WITH_SNAPPY=ON \
+            -DARROW_BUILD_EXAMPLES=ON \
+            -DARROW_DATASET=ON \
+            -DARROW_CSV=ON \
+            -DARROW_WITH_LZ4=ON \
+            ..
+
+      make -j install
+      cp release/libcls_skyhook.so /usr/lib/x86_64-linux-gnu/rados-classes/
+
+3. Deploy a Ceph cluster with a single in-memory OSD using `this <https://github.com/uccross/skyhookdm/blob/master/scripts/micro-osd.sh>`_ script.
+
+   .. code-block:: bash
+
+      ./micro-osd.sh /tmp/skyhook
+
+4. Generate the example dataset.
+
+   .. code-block:: bash
+
+      pip install pandas pyarrow
+      python3 ../../ci/scripts/generate_dataset.py
+      cp -r nyc /mnt/cephfs/
+
+5. Execute the example.
+
+   .. code-block:: bash
+
+      LD_LIBRARY_PATH=/usr/local/lib release/dataset-skyhook-scan-example file:///mnt/cephfs/nyc