You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/19 01:53:55 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #10913: ARROW-13607: [C++] Add Skyhook to Arrow

westonpace commented on a change in pull request #10913:
URL: https://github.com/apache/arrow/pull/10913#discussion_r687195979



##########
File path: ci/scripts/integration_skyhook.sh
##########
@@ -0,0 +1,134 @@
+#!/usr/bin/env bash

Review comment:
       This is just a warning, no action required, but this file requires a fair amount of Ceph knowledge to understand (more than I have).  This will limit the # of people who are able to maintain it.  I don't know if it would be easy but is there any "standard ceph server install" (a maintained docker image or something) that could be used?

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();

Review comment:
       ```suggestion
     ARROW_EXPECT_OK(format->Init());
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();
+
+  if (!columns.empty()) {
+    scanner_builder->Project(columns);
+  }
+
+  scanner_builder->Filter(filter);
+  scanner_builder->UseThreads(use_threads);

Review comment:
       ```suggestion
     ARROW_EXPECT_OK(scanner_builder->UseThreads(use_threads));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();
+
+  if (!columns.empty()) {
+    scanner_builder->Project(columns);
+  }
+
+  scanner_builder->Filter(filter);
+  scanner_builder->UseThreads(use_threads);
+  return scanner_builder->Finish().ValueOrDie();

Review comment:
       ```suggestion
     EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
     return scanner;
   ```

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }
+
+ private:
+  arrow::Status s_;
+  int code_;
+};
+
+class IoCtxInterface {
+ public:
+  IoCtxInterface() {}
+
+  /// \brief Write data to an object.
+  ///
+  /// \param[in] oid the ID of the object to write.
+  /// \param[in] bl a bufferlist containing the data to write to the object.
+  virtual RadosStatus write_full(const std::string& oid, ceph::bufferlist& bl) = 0;

Review comment:
       Nit: Is this method used anywhere?

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  auto source = std::make_shared<arrow::dataset::FileSource>(file);
+  ARROW_ASSIGN_OR_RAISE(auto fragment,
+                        format->MakeFragment(*source, req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(req.dataset_schema,
+                                                                  fragment, options);

Review comment:
       ```suggestion
                                                                     std::move(fragment), std::move(options));
   ```

##########
File path: cpp/src/skyhook/protocol/rados_protocol.cc
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <iostream>
+#include <vector>
+
+namespace skyhook {
+namespace rados {
+
+RadosStatus GetStatusFromReturnCode(int code, std::string msg) {
+  if (code) return RadosStatus(arrow::Status::Invalid(msg), code);
+  return RadosStatus(arrow::Status::OK(), code);
+}
+
+RadosStatus IoCtxWrapper::write_full(const std::string& oid, ceph::bufferlist& bl) {
+  return GetStatusFromReturnCode(this->ioCtx->write_full(oid, bl),

Review comment:
       Nit: It's a little inconsistent that you start using the optional `this->` but only in this file.

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, ScanRequestSerializeDeserialize) {
+  ceph::bufferlist bl;
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  skyhook::SerializeScanRequest(req, bl);
+
+  skyhook::ScanRequest req_;
+  skyhook::DeserializeScanRequest(req_, bl);

Review comment:
       ```suggestion
     ASSERT_OK(skyhook::DeserializeScanRequest(req_, bl));
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, ScanRequestSerializeDeserialize) {
+  ceph::bufferlist bl;
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  skyhook::SerializeScanRequest(req, bl);

Review comment:
       ```suggestion
     ASSERT_OK(skyhook::SerializeScanRequest(req, bl));
   ```

##########
File path: cpp/src/skyhook/CMakeLists.txt
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitationsn
+# under the License.
+
+#
+# arrow_skyhook
+#
+# define project properties
+project(arrow_skyhook)
+cmake_minimum_required(VERSION 3.11)
+set(ARROW_BUILD_STATIC OFF)
+
+# install skyhook headers
+install(FILES client/file_skyhook.h
+        DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/skyhook/client")
+
+# define the targets to build
+add_custom_target(arrow_skyhook_client)
+add_custom_target(cls_skyhook)
+
+# define the dependencies
+find_package(librados REQUIRED)
+include_directories(${LIBRADOS_INCLUDE_DIR})
+set(ARROW_DATASET_LINK_STATIC arrow_dataset_static)
+set(ARROW_DATASET_LINK_SHARED arrow_dataset_shared)
+set(ARROW_DATASET_LINK_STATIC ${ARROW_DATASET_LINK_STATIC} ${LIBRADOS_LIBRARIES})
+set(ARROW_DATASET_LINK_SHARED ${ARROW_DATASET_LINK_SHARED} ${LIBRADOS_LIBRARIES})
+
+# define the client and cls sources
+set(ARROW_SKYHOOK_CLIENT_SOURCES client/file_skyhook.cc protocol/rados_protocol.cc
+                                 protocol/skyhook_protocol.cc)
+set(ARROW_SKYHOOK_CLS_SOURCES cls/cls_skyhook.cc protocol/rados_protocol.cc
+                              protocol/skyhook_protocol.cc)
+
+# define the client library
+add_arrow_lib(arrow_skyhook_client
+              BUILD_SHARED
+              ON
+              SOURCES
+              ${ARROW_SKYHOOK_CLIENT_SOURCES}
+              OUTPUTS
+              ARROW_SKYHOOK_CLIENT_LIBRARIES
+              SHARED_LINK_LIBS
+              ${ARROW_DATASET_LINK_SHARED}
+              STATIC_LINK_LIBS
+              ${ARROW_DATASET_LINK_STATIC}
+              DEPENDENCIES
+              arrow_dataset_static)

Review comment:
       I had to remove these `DEPENDENCIES` sections to get things to compile for me.  I'm not a CMake expert so I'm not sure the issue.  The error was...
   
   ```
   CMake Error at cmake_modules/BuildUtils.cmake:291 (add_dependencies):
     The dependency target "arrow_dataset_static" of target
     "arrow_skyhook_client_objlib" does not exist.
   Call Stack (most recent call first):
     src/skyhook/CMakeLists.txt:49 (add_arrow_lib)
   
   
   CMake Error at cmake_modules/BuildUtils.cmake:291 (add_dependencies):
     The dependency target "arrow_dataset_static" of target "cls_skyhook_objlib"
     does not exist.
   Call Stack (most recent call first):
     src/skyhook/CMakeLists.txt:64 (add_arrow_lib)
   
   ```
   
   I wonder if maybe there is a bootstrapping type problem where this will only work if arrow happens to also be installed.

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();

Review comment:
       ```suggestion
     EXPECT_OK_AND_ASSIGN(auto info, fs->GetFileInfo(path));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();

Review comment:
       ```suggestion
     EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();
+
+  if (!columns.empty()) {
+    scanner_builder->Project(columns);
+  }
+
+  scanner_builder->Filter(filter);
+  scanner_builder->UseThreads(use_threads);
+  return scanner_builder->Finish().ValueOrDie();
+}
+
+TEST(TestSkyhookCLS, SelectEntireDataset) {
+  std::string path;
+  auto fs = GetFileSystemFromUri("file:///mnt/cephfs/nyc", &path);
+  std::vector<std::string> columns;
+
+  auto format = GetParquetFormat();
+  auto dataset = GetDatasetFromPath(fs, format, path);
+  auto scanner =
+      GetScannerFromDataset(dataset, columns, arrow::compute::literal(true), true);
+  auto table_parquet = scanner->ToTable().ValueOrDie();

Review comment:
       Use `EXPECT_...` and `ASSERT_` over `ValueOrDie` for the rest of the file.

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -185,6 +185,10 @@ inline Result<ScanTaskIterator> GetScanTaskIterator(
   auto fn = [options](std::shared_ptr<Fragment> fragment) -> Result<ScanTaskIterator> {
     ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options));
 
+    if (options->skip_compute) {

Review comment:
       This skips compute in the sync scanner.  The sync scanner is slowly becoming deprecated and will likely not be the default scanner in 6.0.0.  You will also need to skip compute in the async scanner.  The equivalent spot would probably be `MakeScanNode` in `scanner.cc`.

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }
+
+ private:
+  arrow::Status s_;
+  int code_;
+};

Review comment:
       Another option is to use `StatusDetail` to encapsulate the `code`.  For an example see https://github.com/apache/arrow/blob/4591d76fce2846a29dac33bf01e9ba0337b118e9/cpp/src/arrow/util/io_util.h#L216

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();

Review comment:
       ```suggestion
     EXPECT_OK_AND_ASSIGN(auto schema, factory->Inspect(inspect_options));
     EXPECT_OK_AND_ASSIGN(auto dataset, factory->Finish(finish_options));
     return dataset;
   ```
   
   I can't see any reason to use `UnionDataset`.  Prefer `EXPECT_...` or `ASSERT_...` variants over `ValueOrDie`.

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();
+
+  if (!columns.empty()) {
+    scanner_builder->Project(columns);

Review comment:
       ```suggestion
       ARROW_EXPECT_OK(scanner_builder->Project(columns));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();

Review comment:
       Use `EXPECT_OK_AND_ASSIGN`

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;

Review comment:
       It seems like it would be more idiomatic to extend `Buffer` with a version that has `std::unique_ptr<ceph::bufferlist>` and then the `RandomAccessObject` doesn't need to worry about ownership.  Otherwise you're relying on the callers to stop accessing the buffers after the `RandomAccessObject` is destroyed which seems unsafe.  See `arrow::io::MemoryMappedFile::MemoryMap::Region` in `src/arrow/io/file.cc` for an example of something similar.

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  auto source = std::make_shared<arrow::dataset::FileSource>(file);
+  ARROW_ASSIGN_OR_RAISE(auto fragment,
+                        format->MakeFragment(*source, req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(req.dataset_schema,
+                                                                  fragment, options);
+
+  ARROW_RETURN_NOT_OK(builder->Filter(req.filter_expression));
+  ARROW_RETURN_NOT_OK(builder->Project(req.projection_schema->field_names()));
+  ARROW_RETURN_NOT_OK(builder->UseThreads(true));
+  ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));
+
+  ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  return table;
+}
+
+/// \brief Scan RADOS objects containing Arrow IPC data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[out] result_table A table to store the resultant data.
+/// \return Status.
+static arrow::Status ScanIpcObject(cls_method_context_t hctx, skyhook::ScanRequest req,
+                                   std::shared_ptr<arrow::Table>* result_table) {
+  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+  auto fragment_scan_options = std::make_shared<arrow::dataset::IpcFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(*result_table, DoScan(hctx, req, format, fragment_scan_options));

Review comment:
       ```suggestion
     ARROW_ASSIGN_OR_RAISE(*result_table, DoScan(hctx, req, std::move(format), std::move(fragment_scan_options)));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  auto source = std::make_shared<arrow::dataset::FileSource>(file);
+  ARROW_ASSIGN_OR_RAISE(auto fragment,
+                        format->MakeFragment(*source, req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(req.dataset_schema,
+                                                                  fragment, options);
+
+  ARROW_RETURN_NOT_OK(builder->Filter(req.filter_expression));
+  ARROW_RETURN_NOT_OK(builder->Project(req.projection_schema->field_names()));
+  ARROW_RETURN_NOT_OK(builder->UseThreads(true));
+  ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));
+
+  ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  return table;
+}
+
+/// \brief Scan RADOS objects containing Arrow IPC data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[out] result_table A table to store the resultant data.
+/// \return Status.
+static arrow::Status ScanIpcObject(cls_method_context_t hctx, skyhook::ScanRequest req,

Review comment:
       Consider returning `Result<std::shared_ptr<Table>>` instead of using a single out parameter.

##########
File path: cpp/src/skyhook/protocol/ScanRequest.fbs
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// EXPERIMENTAL: Metadata for n-dimensional arrays, aka "tensors" or
+/// "ndarrays". Arrow implementations in general are not required to implement
+/// this type
+
+namespace org.apache.arrow.flatbuf;
+
+table ScanRequest {
+  file_size: long;
+  file_format: short;
+  filter: [ubyte];
+  partition: [ubyte];
+  dataset_schema: [ubyte];
+  projection_schema: [ubyte];
+}
+
+root_type ScanRequest;

Review comment:
       ```suggestion
   root_type ScanRequest;
   
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.

Review comment:
       ```suggestion
   /// scan.
   /// \return Table.
   ```
   Or, alternatively, just don't include the `\return` line if there is nothing useful to add

##########
File path: cpp/src/skyhook/protocol/rados_protocol.cc
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <iostream>
+#include <vector>
+
+namespace skyhook {
+namespace rados {
+
+RadosStatus GetStatusFromReturnCode(int code, std::string msg) {
+  if (code) return RadosStatus(arrow::Status::Invalid(msg), code);
+  return RadosStatus(arrow::Status::OK(), code);
+}
+
+RadosStatus IoCtxWrapper::write_full(const std::string& oid, ceph::bufferlist& bl) {
+  return GetStatusFromReturnCode(this->ioCtx->write_full(oid, bl),
+                                 "ioctx->write_full failed.");
+}
+
+RadosStatus IoCtxWrapper::read(const std::string& oid, ceph::bufferlist& bl, size_t len,
+                               uint64_t offset) {
+  return GetStatusFromReturnCode(this->ioCtx->read(oid, bl, len, offset),
+                                 "ioctx->read failed.");
+}
+
+RadosStatus IoCtxWrapper::exec(const std::string& oid, const char* cls,
+                               const char* method, ceph::bufferlist& in,
+                               ceph::bufferlist& out) {
+  return GetStatusFromReturnCode(this->ioCtx->exec(oid, cls, method, in, out),
+                                 "ioctx->exec failed.");
+}
+
+RadosStatus IoCtxWrapper::stat(const std::string& oid, uint64_t* psize) {
+  return GetStatusFromReturnCode(this->ioCtx->stat(oid, psize, NULL),
+                                 "ioctx->stat failed.");
+}
+
+std::vector<std::string> IoCtxWrapper::list() {
+  std::vector<std::string> oids;
+  librados::NObjectIterator begin = this->ioCtx->nobjects_begin();
+  librados::NObjectIterator end = this->ioCtx->nobjects_end();
+  for (; begin != end; begin++) {
+    oids.push_back(begin->get_oid());
+  }
+  return oids;
+}
+
+RadosStatus RadosWrapper::init2(const char* const name, const char* const clustername,
+                                uint64_t flags) {
+  return GetStatusFromReturnCode(this->cluster->init2(name, clustername, flags),
+                                 "rados->init failed.");
+}
+
+RadosStatus RadosWrapper::ioctx_create(const char* name, IoCtxInterface* pioctx) {
+  librados::IoCtx ioCtx;
+  int ret = this->cluster->ioctx_create(name, ioCtx);
+  pioctx->setIoCtx(&ioCtx);
+  return GetStatusFromReturnCode(ret, "rados->ioctx_create failed.");
+}
+
+RadosStatus RadosWrapper::conf_read_file(const char* const path) {
+  return GetStatusFromReturnCode(this->cluster->conf_read_file(path),
+                                 "rados->conf_read_file failed.");
+}
+
+RadosStatus RadosWrapper::connect() {
+  return GetStatusFromReturnCode(this->cluster->connect(), "rados->connect failed.");
+}
+
+void RadosWrapper::shutdown() { this->cluster->shutdown(); }
+
+RadosConn::~RadosConn() { Shutdown(); }
+
+arrow::Status RadosConn::Connect() {
+  if (connected) {
+    return arrow::Status::OK();
+  }
+
+  ARROW_RETURN_NOT_OK(
+      rados->init2(ctx->ceph_user_name.c_str(), ctx->ceph_cluster_name.c_str(), 0)
+          .status());
+  ARROW_RETURN_NOT_OK(rados->conf_read_file(ctx->ceph_config_path.c_str()).status());
+  ARROW_RETURN_NOT_OK(rados->connect().status());
+  ARROW_RETURN_NOT_OK(rados->ioctx_create(ctx->ceph_data_pool.c_str(), io_ctx).status());
+  return arrow::Status::OK();
+}
+
+arrow::Status RadosConn::Shutdown() {
+  if (connected) {
+    rados->shutdown();
+    connected = false;
+  }
+  return arrow::Status::OK();
+}

Review comment:
       ```suggestion
   void RadosConn::Shutdown() {
     if (connected) {
       rados->shutdown();
       connected = false;
     }
   }
   ```

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }

Review comment:
       ```suggestion
     int code() const { return code_; }
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook_test.cc
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+std::shared_ptr<skyhook::SkyhookFileFormat> GetSkyhookFormat() {
+  std::string ceph_config_path = "/etc/ceph/ceph.conf";
+  std::string ceph_data_pool = "cephfs_data";
+  std::string ceph_user_name = "client.admin";
+  std::string ceph_cluster_name = "ceph";
+  std::string ceph_cls_name = "skyhook";
+  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
+      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
+                                              ceph_user_name, ceph_cluster_name,
+                                              ceph_cls_name);
+  auto format = std::make_shared<skyhook::SkyhookFileFormat>(rados_ctx, "parquet");
+  format->Init();
+  return format;
+}
+
+std::shared_ptr<arrow::dataset::ParquetFileFormat> GetParquetFormat() {
+  return std::make_shared<arrow::dataset::ParquetFileFormat>();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string dir) {
+  arrow::fs::FileSelector s;
+  s.base_dir = dir;
+  s.recursive = true;
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
+      arrow::schema({arrow::field("payment_type", arrow::int32()),
+                     arrow::field("VendorID", arrow::int32())}));
+  auto factory =
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();
+
+  arrow::dataset::InspectOptions inspect_options;
+  arrow::dataset::FinishOptions finish_options;
+  auto schema = factory->Inspect(inspect_options).ValueOrDie();
+  auto child = factory->Finish(finish_options).ValueOrDie();
+
+  arrow::dataset::DatasetVector children{1, child};
+  auto dataset =
+      arrow::dataset::UnionDataset::Make(std::move(schema), std::move(children));
+
+  return dataset.ValueOrDie();
+}
+
+std::shared_ptr<arrow::fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
+                                                            std::string* path) {
+  return arrow::fs::FileSystemFromUri(uri, path).ValueOrDie();
+}
+
+std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromPath(
+    std::shared_ptr<arrow::fs::FileSystem> fs,
+    std::shared_ptr<arrow::dataset::FileFormat> format, std::string path) {
+  auto info = fs->GetFileInfo(path).ValueOrDie();
+  return GetDatasetFromDirectory(fs, format, path);
+}
+
+std::shared_ptr<arrow::dataset::Scanner> GetScannerFromDataset(
+    std::shared_ptr<arrow::dataset::Dataset> dataset, std::vector<std::string> columns,
+    arrow::compute::Expression filter, bool use_threads) {
+  auto scanner_builder = dataset->NewScan().ValueOrDie();
+
+  if (!columns.empty()) {
+    scanner_builder->Project(columns);
+  }
+
+  scanner_builder->Filter(filter);

Review comment:
       ```suggestion
     ARROW_EXPECT_OK(scanner_builder->Filter(filter));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  auto source = std::make_shared<arrow::dataset::FileSource>(file);
+  ARROW_ASSIGN_OR_RAISE(auto fragment,
+                        format->MakeFragment(*source, req.partition_expression));
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto builder = std::make_shared<arrow::dataset::ScannerBuilder>(req.dataset_schema,
+                                                                  fragment, options);
+
+  ARROW_RETURN_NOT_OK(builder->Filter(req.filter_expression));
+  ARROW_RETURN_NOT_OK(builder->Project(req.projection_schema->field_names()));
+  ARROW_RETURN_NOT_OK(builder->UseThreads(true));
+  ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));
+
+  ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
+  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
+  return table;
+}
+
+/// \brief Scan RADOS objects containing Arrow IPC data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[out] result_table A table to store the resultant data.
+/// \return Status.
+static arrow::Status ScanIpcObject(cls_method_context_t hctx, skyhook::ScanRequest req,
+                                   std::shared_ptr<arrow::Table>* result_table) {
+  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+  auto fragment_scan_options = std::make_shared<arrow::dataset::IpcFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(*result_table, DoScan(hctx, req, format, fragment_scan_options));
+
+  return arrow::Status::OK();
+}
+
+/// \brief Scan RADOS objects containing Parquet binary data.
+/// \param[in] hctx The RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[out] result_table A table to store the resultant data.
+/// \return Status.
+static arrow::Status ScanParquetObject(cls_method_context_t hctx,
+                                       skyhook::ScanRequest req,
+                                       std::shared_ptr<arrow::Table>* result_table) {
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto fragment_scan_options =
+      std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
+
+  ARROW_ASSIGN_OR_RAISE(*result_table, DoScan(hctx, req, format, fragment_scan_options));

Review comment:
       ```suggestion
     ARROW_ASSIGN_OR_RAISE(*result_table, DoScan(hctx, req, std::move(format), std::move(fragment_scan_options)));
   ```

##########
File path: cpp/src/skyhook/cls/cls_skyhook.cc
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <rados/objclass.h>
+#include <memory>
+
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
+#include "arrow/util/compression.h"
+
+CLS_VER(1, 0)
+CLS_NAME(skyhook)
+
+cls_handle_t h_class;
+cls_method_handle_t h_scan_op;
+
+/// \brief Log skyhook errors using RADOS object class SDK's logger.
+void LogSkyhookError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }
+
+/// \class RandomAccessObject
+/// \brief An interface to provide a file-like view over RADOS objects.
+class RandomAccessObject : public arrow::io::RandomAccessFile {
+ public:
+  explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) {
+    hctx_ = hctx;
+    content_length_ = file_size;
+    chunks_ = std::vector<ceph::bufferlist*>();
+  }
+
+  ~RandomAccessObject() { Close(); }
+
+  /// Check if the file stream is closed.
+  arrow::Status CheckClosed() const {
+    if (closed_) {
+      return arrow::Status::Invalid("Operation on closed stream");
+    }
+    return arrow::Status::OK();
+  }
+
+  /// Check if the position of the object is valid.
+  arrow::Status CheckPosition(int64_t position, const char* action) const {
+    if (position < 0) {
+      return arrow::Status::Invalid("Cannot ", action, " from negative position");
+    }
+    if (position > content_length_) {
+      return arrow::Status::IOError("Cannot ", action, " past end of file");
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+    return arrow::Status::NotImplemented(
+        "ReadAt has not been implemented in RandomAccessObject");
+  }
+
+  /// Read a specified number of bytes from a specified position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "read"));
+
+    // No need to allocate more than the remaining number of bytes
+    nbytes = std::min(nbytes, content_length_ - position);
+
+    if (nbytes > 0) {
+      ceph::bufferlist* bl = new ceph::bufferlist();
+      cls_cxx_read(hctx_, position, nbytes, bl);
+      chunks_.push_back(bl);
+      return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
+    }
+    return std::make_shared<arrow::Buffer>("");
+  }
+
+  /// Read a specified number of bytes from the current position.
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+    ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+    pos_ += buffer->size();
+    return std::move(buffer);
+  }
+
+  /// Read a specified number of bytes from the current position into an output stream.
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) {
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+    pos_ += bytes_read;
+    return bytes_read;
+  }
+
+  /// Return the size of the file.
+  arrow::Result<int64_t> GetSize() {
+    RETURN_NOT_OK(CheckClosed());
+    return content_length_;
+  }
+
+  /// Sets the file-pointer offset, measured from the beginning of the
+  /// file, at which the next read or write occurs.
+  arrow::Status Seek(int64_t position) {
+    RETURN_NOT_OK(CheckClosed());
+    RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+    pos_ = position;
+    return arrow::Status::OK();
+  }
+
+  /// Returns the file-pointer offset.
+  arrow::Result<int64_t> Tell() const {
+    RETURN_NOT_OK(CheckClosed());
+    return pos_;
+  }
+
+  /// Closes the file stream and deletes the chunks and releases the memory
+  /// used by the chunks.
+  arrow::Status Close() {
+    closed_ = true;
+    for (auto chunk : chunks_) {
+      delete chunk;
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const { return closed_; }
+
+ private:
+  cls_method_context_t hctx_;
+  bool closed_ = false;
+  int64_t pos_ = 0;
+  int64_t content_length_ = -1;
+  std::vector<ceph::bufferlist*> chunks_;
+};
+
+/// \brief  Driver function to execute the Scan operations.
+/// \param[in] hctx RADOS object context.
+/// \param[in] req The scan request received from the client.
+/// \param[in] format The file format instance to use in the scan.
+/// \param[in] fragment_scan_options The fragment scan options to use to customize the
+/// scan. \return Table.
+arrow::Result<std::shared_ptr<arrow::Table>> DoScan(
+    cls_method_context_t hctx, skyhook::ScanRequest req,
+    std::shared_ptr<arrow::dataset::FileFormat> format,
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
+  auto file = std::make_shared<RandomAccessObject>(hctx, req.file_size);
+  auto source = std::make_shared<arrow::dataset::FileSource>(file);
+  ARROW_ASSIGN_OR_RAISE(auto fragment,
+                        format->MakeFragment(*source, req.partition_expression));

Review comment:
       ```suggestion
     arrow::dataset::FileSource source(file);
     ARROW_ASSIGN_OR_RAISE(auto fragment,
                           format->MakeFragment(std::move(source), req.partition_expression));
   ```

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}
+  arrow::Status status() { return s_; }
+  int code() { return code_; }
+
+ private:
+  arrow::Status s_;
+  int code_;
+};
+
+class IoCtxInterface {
+ public:
+  IoCtxInterface() {}
+
+  /// \brief Write data to an object.
+  ///
+  /// \param[in] oid the ID of the object to write.
+  /// \param[in] bl a bufferlist containing the data to write to the object.
+  virtual RadosStatus write_full(const std::string& oid, ceph::bufferlist& bl) = 0;
+
+  /// \brief Read a RADOS object.
+  ///
+  /// \param[in] oid the object ID which to read.
+  /// \param[in] bl a bufferlist to hold the contents of the read object.
+  /// \param[in] len the length of data to read from an object.
+  /// \param[in] offset the offset of the object to read from.
+  virtual RadosStatus read(const std::string& oid, ceph::bufferlist& bl, size_t len,
+                           uint64_t offset) = 0;
+
+  /// \brief Executes a CLS function.
+  ///
+  /// \param[in] oid the object ID on which to execute the CLS function.
+  /// \param[in] cls the name of the CLS.
+  /// \param[in] method the name of the CLS function.
+  /// \param[in] in a bufferlist to send data to the CLS function.
+  /// \param[in] out a bufferlist to recieve data from the CLS function.
+  virtual RadosStatus exec(const std::string& oid, const char* cls, const char* method,
+                           ceph::bufferlist& in, ceph::bufferlist& out) = 0;
+
+  virtual std::vector<std::string> list() = 0;
+
+  virtual RadosStatus stat(const std::string& oid, uint64_t* psize) = 0;
+
+ private:
+  friend class RadosWrapper;
+  /// \brief Set the `librados::IoCtx` instance inside a IoCtxInterface instance.
+  virtual void setIoCtx(librados::IoCtx* ioCtx_) = 0;
+};
+
+class IoCtxWrapper : public IoCtxInterface {
+ public:
+  IoCtxWrapper() { ioCtx = new librados::IoCtx(); }
+  ~IoCtxWrapper() { delete ioCtx; }
+  RadosStatus write_full(const std::string& oid, ceph::bufferlist& bl) override;
+  RadosStatus read(const std::string& oid, ceph::bufferlist& bl, size_t len,
+                   uint64_t offset) override;
+  RadosStatus exec(const std::string& oid, const char* cls, const char* method,
+                   ceph::bufferlist& in, ceph::bufferlist& out) override;
+  std::vector<std::string> list() override;
+
+  RadosStatus stat(const std::string& oid, uint64_t* psize) override;
+
+ private:
+  void setIoCtx(librados::IoCtx* ioCtx_) override { *ioCtx = *ioCtx_; }
+  librados::IoCtx* ioCtx;
+};
+
+class RadosInterface {
+ public:
+  RadosInterface() {}
+
+  /// \brief Initializes a cluster handle.
+  ///
+  /// \param[in] name the username of the client.
+  /// \param[in] clustername the name of the Ceph cluster.
+  /// \param[in] flags some extra flags to pass.
+  virtual RadosStatus init2(const char* const name, const char* const clustername,
+                            uint64_t flags) = 0;
+
+  /// \brief Create an I/O context
+  ///
+  /// \param[in] name the RADOS pool to connect to.
+  /// \param[in] pioctx an instance of IoCtxInterface.
+  virtual RadosStatus ioctx_create(const char* name, IoCtxInterface* pioctx) = 0;
+
+  /// \brief Read the Ceph config file.
+  ///
+  /// \param[in] path the path to the config file.
+  virtual RadosStatus conf_read_file(const char* const path) = 0;
+
+  /// \brief Connect to the Ceph cluster.
+  virtual RadosStatus connect() = 0;
+
+  /// \brief Close connection to the Ceph cluster.
+  virtual void shutdown() = 0;
+};
+
+class RadosWrapper : public RadosInterface {
+ public:
+  RadosWrapper() { cluster = new librados::Rados(); }
+  ~RadosWrapper() { delete cluster; }
+  RadosStatus init2(const char* const name, const char* const clustername,
+                    uint64_t flags) override;
+  RadosStatus ioctx_create(const char* name, IoCtxInterface* pioctx) override;
+  RadosStatus conf_read_file(const char* const path) override;
+  RadosStatus connect() override;
+  void shutdown() override;
+
+ private:
+  librados::Rados* cluster;
+};
+
+/// Connect to a Ceph cluster and hold the connection
+/// information for use in later stages.
+class RadosConn {
+ public:
+  explicit RadosConn(std::shared_ptr<skyhook::RadosConnCtx> ctx)
+      : ctx(ctx),

Review comment:
       ```suggestion
         : ctx(std::move(ctx)),
   ```

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.h
##########
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "skyhook/protocol/rados_protocol.h"
+
+#include <sys/stat.h>
+#include <sstream>
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+
+#define SCAN_UNKNOWN_ERR_MSG "something went wrong while scanning file fragment"
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace skyhook {
+
+/// An enum to represent the different
+/// types of file formats that Skyhook supports.
+struct SkyhookFileType {
+  enum type { PARQUET, IPC };
+};
+
+/// A struct encapsulating all the parameters
+/// required to be serialized in the form of flatbuffers for
+/// sending to the cls.
+struct ScanRequest {
+  arrow::compute::Expression filter_expression;
+  arrow::compute::Expression partition_expression;
+  std::shared_ptr<arrow::Schema> projection_schema;
+  std::shared_ptr<arrow::Schema> dataset_schema;
+  int64_t file_size;
+  SkyhookFileType::type file_format;
+};
+
+/// Utility functions to serialize and deserialize scan requests and result Arrow tables.
+arrow::Status SerializeScanRequest(ScanRequest req, ceph::bufferlist& bl);
+arrow::Status DeserializeScanRequest(ScanRequest& req, ceph::bufferlist bl);
+arrow::Status SerializeTable(std::shared_ptr<arrow::Table> table, ceph::bufferlist& bl);
+arrow::Status DeserializeTable(arrow::RecordBatchVector& batches, ceph::bufferlist bl,
+                               bool use_threads);
+
+/// Utility function to invoke a RADOS object class function on an RADOS object.
+arrow::Status ExecuteObjectClassFn(std::shared_ptr<rados::RadosConn> connection_,
+                                   const std::string& oid, const std::string& fn,
+                                   ceph::bufferlist& in, ceph::bufferlist& out);
+
+/// An interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and it's underlying object.
+class SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(const std::shared_ptr<rados::RadosConn>& connection)
+      : connection_(std::move(connection)) {}

Review comment:
       ```suggestion
         : connection_(connection) {}
   ```

##########
File path: cpp/src/skyhook/protocol/rados_protocol.h
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <rados/librados.hpp>
+
+#include "arrow/status.h"
+
+#include "skyhook/client/file_skyhook.h"
+
+namespace skyhook {
+namespace rados {
+
+/// Wrap Arrow Status with a custom return code.
+class RadosStatus {
+ public:
+  RadosStatus(arrow::Status s, int code) : s_(s), code_(code) {}

Review comment:
       ```suggestion
     RadosStatus(arrow::Status s, int code) : s_(std::move(s)), code_(code) {}
   ```

##########
File path: cpp/src/skyhook/client/file_skyhook.h
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"

Review comment:
       This is good!  `file_skyhook.h` has no direct dependency on rados which was the goal.  Thank you.

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include <flatbuffers/flatbuffers.h>
+
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/result.h"
+
+#include "ScanRequest_generated.h"
+
+namespace skyhook {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+arrow::Status SerializeScanRequest(ScanRequest req, ceph::bufferlist& bl) {
+  auto filter_expression = arrow::compute::Serialize(req.filter_expression).ValueOrDie();
+  auto partition_expression =
+      arrow::compute::Serialize(req.partition_expression).ValueOrDie();
+  auto projection_schema =
+      arrow::ipc::SerializeSchema(*req.projection_schema).ValueOrDie();
+  auto dataset_schema = arrow::ipc::SerializeSchema(*req.dataset_schema).ValueOrDie();

Review comment:
       Replace `ValueOrDie` with `ARROW_ASSIGN_OR_RAISE`

##########
File path: cpp/cmake_modules/Findlibrados.cmake
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       It's unfortunate there is no librados2 in conda-forge.  I tried to make it work with a hybrid build but my system librados2 linked with system libcrypto while parquet linked with conda's libssl and the mismatch caused errors.  I ended up making a cmake build that used all system libraries and that worked.

##########
File path: cpp/src/skyhook/client/file_skyhook.cc
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/client/file_skyhook.h"
+#include "skyhook/protocol/rados_protocol.h"
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/util/compression.h"
+
+namespace skyhook {
+
+/// A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public arrow::dataset::ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
+                  std::shared_ptr<arrow::dataset::Fragment> fragment,
+                  arrow::dataset::FileSource source,
+                  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
+                  skyhook::SkyhookFileType::type file_format,
+                  arrow::compute::Expression partition_expression)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        file_format_(file_format),
+        partition_expression_(partition_expression) {}
+
+  arrow::Result<arrow::RecordBatchIterator> Execute() override {
+    /// Retrieve the size of the file using POSIX `stat`.
+    struct stat st {};
+    RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    /// Create a ScanRequest instance.
+    skyhook::ScanRequest req;
+    req.filter_expression = options_->filter;
+    req.partition_expression = partition_expression_;
+    req.projection_schema = options_->projected_schema;
+    req.dataset_schema = options_->dataset_schema;
+    req.file_size = st.st_size;
+    req.file_format = file_format_;
+
+    /// Serialize the ScanRequest into a ceph bufferlist.
+    ceph::bufferlist request;
+    RETURN_NOT_OK(skyhook::SerializeScanRequest(req, request));
+
+    /// Execute the Ceph object class method `scan_op`.
+    ceph::bufferlist result;
+    RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
+
+    /// Read RecordBatches from the result bufferlist. Since, this step might use
+    /// threads for decompressing compressed batches, to avoid running into
+    /// [ARROW-12597], we switch off threaded decompression to avoid nested threading
+    /// scenarios when scan tasks are executed in parallel by the CpuThreadPool.
+    arrow::RecordBatchVector batches;
+    RETURN_NOT_OK(skyhook::DeserializeTable(batches, result, !options_->use_threads));
+    return arrow::MakeVectorIterator(batches);
+  }
+
+ protected:
+  arrow::dataset::FileSource source_;
+  std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
+  skyhook::SkyhookFileType::type file_format_;
+  arrow::compute::Expression partition_expression_;
+};
+
+class SkyhookFileFormat::Impl {
+ public:
+  Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
+      : ctx_(std::move(ctx)), file_format_(file_format) {}
+
+  ~Impl() {}
+
+  arrow::Status Init() {
+    /// Connect to the RADOS cluster and instantiate a `SkyhookDirectObjectAccess`
+    /// instance.
+    auto connection = std::make_shared<skyhook::rados::RadosConn>(ctx_);
+    RETURN_NOT_OK(connection->Connect());
+    doa_ = std::make_shared<skyhook::SkyhookDirectObjectAccess>(connection);
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
+      const std::shared_ptr<arrow::dataset::ScanOptions>& options,
+      const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
+    /// Make sure client-side filtering and projection is turned off.
+    file->handles_compute = false;
+
+    /// Convert string file format name to Enum.
+    skyhook::SkyhookFileType::type file_format;
+    if (file_format_ == "parquet") {
+      file_format = skyhook::SkyhookFileType::type::PARQUET;
+    } else if (file_format_ == "ipc") {
+      file_format = skyhook::SkyhookFileType::type::IPC;
+    } else {
+      return arrow::Status::Invalid("Unsupported file format.");
+    }
+
+    arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
+        std::move(options), std::move(file), file->source(), std::move(doa_), file_format,

Review comment:
       Is this move of `doa_` safe?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -117,6 +117,8 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// makes extensive use of threading and is still considered experimental
   bool use_async = false;
 
+  bool skip_compute = false;
+

Review comment:
       Is this used?

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -185,6 +185,10 @@ inline Result<ScanTaskIterator> GetScanTaskIterator(
   auto fn = [options](std::shared_ptr<Fragment> fragment) -> Result<ScanTaskIterator> {
     ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options));
 
+    if (!fragment->handles_compute) {
+      return std::move(scan_task_it);
+    }
+

Review comment:
       The `AsyncScanner` will need the equivalent.  I think it would belong in `arrow::dataset::AsyncScanner::ScanBatchesUnorderedAsync`

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include <flatbuffers/flatbuffers.h>
+
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/result.h"
+
+#include "ScanRequest_generated.h"
+
+namespace skyhook {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+arrow::Status SerializeScanRequest(ScanRequest req, ceph::bufferlist& bl) {
+  auto filter_expression = arrow::compute::Serialize(req.filter_expression).ValueOrDie();
+  auto partition_expression =
+      arrow::compute::Serialize(req.partition_expression).ValueOrDie();
+  auto projection_schema =
+      arrow::ipc::SerializeSchema(*req.projection_schema).ValueOrDie();
+  auto dataset_schema = arrow::ipc::SerializeSchema(*req.dataset_schema).ValueOrDie();
+
+  flatbuffers::FlatBufferBuilder builder(1024);
+  auto filter_expression_vector =
+      builder.CreateVector(filter_expression->data(), filter_expression->size());
+  auto partition_expression_vector =
+      builder.CreateVector(partition_expression->data(), partition_expression->size());
+  auto projected_schema_vector =
+      builder.CreateVector(projection_schema->data(), projection_schema->size());
+  auto dataset_schema_vector =
+      builder.CreateVector(dataset_schema->data(), dataset_schema->size());
+
+  auto request = flatbuf::CreateScanRequest(
+      builder, req.file_size, static_cast<int>(req.file_format), filter_expression_vector,
+      partition_expression_vector, dataset_schema_vector, projected_schema_vector);
+  builder.Finish(request);
+  uint8_t* buf = builder.GetBufferPointer();
+  int size = builder.GetSize();
+
+  bl.append((char*)buf, size);
+  return arrow::Status::OK();
+}
+
+arrow::Status DeserializeScanRequest(ScanRequest& req, ceph::bufferlist bl) {
+  auto request = flatbuf::GetScanRequest((uint8_t*)bl.c_str());
+
+  auto filter_expression_ = arrow::compute::Deserialize(
+                                std::make_shared<arrow::Buffer>(
+                                    request->filter()->data(), request->filter()->size()))
+                                .ValueOrDie();
+  req.filter_expression = filter_expression_;
+
+  auto partition_expression_ =
+      arrow::compute::Deserialize(
+          std::make_shared<arrow::Buffer>(request->partition()->data(),
+                                          request->partition()->size()))
+          .ValueOrDie();
+  req.partition_expression = partition_expression_;
+
+  arrow::ipc::DictionaryMemo empty_memo;
+  arrow::io::BufferReader projection_schema_reader(request->projection_schema()->data(),
+                                                   request->projection_schema()->size());
+  arrow::io::BufferReader dataset_schema_reader(request->dataset_schema()->data(),
+                                                request->dataset_schema()->size());
+
+  req.projection_schema =
+      arrow::ipc::ReadSchema(&projection_schema_reader, &empty_memo).ValueOrDie();
+  req.dataset_schema =
+      arrow::ipc::ReadSchema(&dataset_schema_reader, &empty_memo).ValueOrDie();
+
+  req.file_size = request->file_size();
+  req.file_format = (SkyhookFileType::type)request->file_format();
+  return arrow::Status::OK();
+}
+
+arrow::Status SerializeTable(std::shared_ptr<arrow::Table> table, ceph::bufferlist& bl) {
+  auto buffer_output_stream = arrow::io::BufferOutputStream::Create().ValueOrDie();
+
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+  auto codec = arrow::Compression::LZ4_FRAME;
+
+  options.codec =
+      arrow::util::Codec::Create(codec, std::numeric_limits<int>::min()).ValueOrDie();

Review comment:
       `std::numeric_limits<int>::min()` should be the default.  Don't specify the second argument in case that changes someday.

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, ScanRequestSerializeDeserialize) {
+  ceph::bufferlist bl;
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  skyhook::SerializeScanRequest(req, bl);
+
+  skyhook::ScanRequest req_;
+  skyhook::DeserializeScanRequest(req_, bl);
+  ASSERT_EQ(req.filter_expression.Equals(req_.filter_expression), 1);
+  ASSERT_EQ(req.partition_expression.Equals(req_.partition_expression), 1);
+  ASSERT_EQ(req.projection_schema->Equals(req_.projection_schema), 1);
+  ASSERT_EQ(req.dataset_schema->Equals(req_.dataset_schema), 1);
+  ASSERT_EQ(req.file_size, req_.file_size);
+  ASSERT_EQ(req.file_format, req_.file_format);
+}
+
+TEST(TestSkyhookProtocol, SerializeDeserializeTable) {
+  std::shared_ptr<arrow::Table> table = CreateTable();
+  ceph::bufferlist bl;
+  skyhook::SerializeTable(table, bl);

Review comment:
       ```suggestion
     ASSERT_OK(skyhook::SerializeTable(table, bl));
   ```

##########
File path: cpp/src/skyhook/CMakeLists.txt
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       @kou @kszucs Any thoughts on the cmake configurations added as part of this PR?

##########
File path: ci/scripts/integration_skyhook.sh
##########
@@ -0,0 +1,134 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+set -x
+set -u
+
+ARROW_BUILD_DIR=${1}/cpp
+DIR=/tmp/integration-skyhook
+
+# reset
+pkill ceph || true
+rm -rf ${DIR}/*
+LOG_DIR=${DIR}/log
+MON_DATA=${DIR}/mon
+MDS_DATA=${DIR}/mds
+MOUNTPT=${MDS_DATA}/mnt
+OSD_DATA=${DIR}/osd
+mkdir -p ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT}
+MDS_NAME="Z"
+MON_NAME="a"
+MGR_NAME="x"
+MIRROR_ID="m"
+
+# cluster wide parameters
+cat >> ${DIR}/ceph.conf <<EOF
+[global]
+fsid = $(uuidgen)
+osd crush chooseleaf type = 0
+run dir = ${DIR}/run
+auth cluster required = none
+auth service required = none
+auth client required = none
+osd pool default size = 1
+mon host = ${HOSTNAME}
+[mds.${MDS_NAME}]
+host = ${HOSTNAME}
+[mon.${MON_NAME}]
+log file = ${LOG_DIR}/mon.log
+chdir = ""
+mon cluster log file = ${LOG_DIR}/mon-cluster.log
+mon data = ${MON_DATA}
+mon data avail crit = 0
+mon addr = ${HOSTNAME}
+mon allow pool delete = true
+[osd.0]
+log file = ${LOG_DIR}/osd.log
+chdir = ""
+osd data = ${OSD_DATA}
+osd journal = ${OSD_DATA}.journal
+osd journal size = 100
+osd objectstore = memstore
+osd class load list = *
+osd class default list = *
+EOF
+
+export CEPH_CONF=${DIR}/ceph.conf
+cp $CEPH_CONF /etc/ceph/ceph.conf
+
+# start an osd
+ceph-mon --id ${MON_NAME} --mkfs --keyring /dev/null
+touch ${MON_DATA}/keyring
+ceph-mon --id ${MON_NAME}
+
+# start an osd
+OSD_ID=$(ceph osd create)
+ceph osd crush add osd.${OSD_ID} 1 root=default
+ceph-osd --id ${OSD_ID} --mkjournal --mkfs
+ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID}
+
+# start an mds for cephfs
+ceph auth get-or-create mds.${MDS_NAME} mon 'profile mds' mgr 'profile mds' mds 'allow *' osd 'allow *' > ${MDS_DATA}/keyring
+ceph osd pool create cephfs_data 8
+ceph osd pool create cephfs_metadata 8
+ceph fs new cephfs cephfs_metadata cephfs_data
+ceph fs ls
+ceph-mds -i ${MDS_NAME}
+ceph status
+while [[ ! $(ceph mds stat | grep "up:active") ]]; do sleep 1; done
+
+# start a manager
+ceph-mgr --id ${MGR_NAME}
+
+# test the setup
+ceph --version
+ceph status
+
+apt update
+apt install -y ceph-fuse attr
+
+pushd ${ARROW_BUILD_DIR}
+    # create the rados-classes, if not there already
+    mkdir -p /usr/lib/x86_64-linux-gnu/rados-classes/
+    cp debug/libcls_skyhook* /usr/lib/x86_64-linux-gnu/rados-classes/
+
+    # mount a ceph filesystem to /mnt/cephfs in the user-space using ceph-fuse
+    mkdir -p /mnt/cephfs
+    ceph-fuse /mnt/cephfs
+    sleep 5
+
+    # download an example dataset and copy into the mounted dir
+    rm -rf nyc*
+    wget https://raw.githubusercontent.com/JayjeetAtGithub/zips/main/nyc.zip
+    unzip nyc.zip
+    cp -r nyc /mnt/cephfs/
+    sleep 10

Review comment:
       I think it would be nice if this were generated test data although the risk should be somewhat isolated since this only runs on a docker container.  You mentioned generate_dataset.py and that would be great I think.

##########
File path: cpp/src/skyhook/protocol/skyhook_protocol_test.cc
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "skyhook/protocol/skyhook_protocol.h"
+
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/test_util.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+
+std::shared_ptr<arrow::Table> CreateTable() {
+  auto schema = arrow::schema({
+      {arrow::field("a", arrow::uint8())},
+      {arrow::field("b", arrow::uint32())},
+  });
+
+  std::shared_ptr<arrow::Table> table;
+  return TableFromJSON(schema, {R"([{"a": null, "b": 5},
+                                     {"a": 1,    "b": 3},
+                                     {"a": 3,    "b": null},
+                                     {"a": null, "b": null},
+                                     {"a": 2,    "b": 5},
+                                     {"a": 1,    "b": 5}
+                                    ])"});
+}
+
+TEST(TestSkyhookProtocol, ScanRequestSerializeDeserialize) {
+  ceph::bufferlist bl;
+  skyhook::ScanRequest req;
+  req.filter_expression = arrow::compute::literal(true);
+  req.partition_expression = arrow::compute::literal(false);
+  req.projection_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
+  req.file_size = 1000000;
+  req.file_format = skyhook::SkyhookFileType::type::IPC;
+  skyhook::SerializeScanRequest(req, bl);
+
+  skyhook::ScanRequest req_;
+  skyhook::DeserializeScanRequest(req_, bl);
+  ASSERT_EQ(req.filter_expression.Equals(req_.filter_expression), 1);
+  ASSERT_EQ(req.partition_expression.Equals(req_.partition_expression), 1);
+  ASSERT_EQ(req.projection_schema->Equals(req_.projection_schema), 1);
+  ASSERT_EQ(req.dataset_schema->Equals(req_.dataset_schema), 1);
+  ASSERT_EQ(req.file_size, req_.file_size);
+  ASSERT_EQ(req.file_format, req_.file_format);
+}
+
+TEST(TestSkyhookProtocol, SerializeDeserializeTable) {
+  std::shared_ptr<arrow::Table> table = CreateTable();
+  ceph::bufferlist bl;
+  skyhook::SerializeTable(table, bl);
+
+  arrow::RecordBatchVector batches;
+  skyhook::DeserializeTable(batches, bl, false);

Review comment:
       ```suggestion
     ASSERT_OK(skyhook::DeserializeTable(batches, bl, false));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org