You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/02 23:33:38 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #10431: ARROW-12921: [C++][Dataset] Add RadosParquetFileFormat to Dataset API

westonpace commented on a change in pull request #10431:
URL: https://github.com/apache/arrow/pull/10431#discussion_r681318901



##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+namespace connection {
+/// \brief An interface for general connections.
+class ARROW_DS_EXPORT Connection {

Review comment:
       I'm not sure this is adding much at the moment.  Is the only implementation `RadosConnection`?  If you plan on having multiple implementations then I think the interface could be added back in when that happens.  Otherwise, it is not clear from the code what the future goal is for this type.

##########
File path: cpp/src/arrow/dataset/file_skyhook.cc
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "arrow/dataset/file_skyhook.h"
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+
+#include <flatbuffers/flatbuffers.h>
+
+#include "generated/ScanRequest_generated.h"
+
+namespace arrow {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+namespace dataset {
+
+namespace connection {
+
+std::mutex connection_mutex;
+
+RadosConnection::~RadosConnection() { Shutdown(); }
+
+Status RadosConnection::Connect() {
+  if (connected) {
+    return Status::OK();
+  }
+
+  // Locks the mutex. Only one thread can pass here at a time.
+  // Another thread handled the connection already.
+  std::unique_lock<std::mutex> lock(connection_mutex);
+  if (connected) {
+    return Status::OK();
+  }
+  connected = true;
+
+  if (rados->init2(ctx.user_name.c_str(), ctx.cluster_name.c_str(), 0))
+    return Status::Invalid("librados::init2 returned non-zero exit code.");
+
+  if (rados->conf_read_file(ctx.ceph_config_path.c_str()))
+    return Status::Invalid("librados::conf_read_file returned non-zero exit code.");
+
+  if (rados->connect())
+    return Status::Invalid("librados::connect returned non-zero exit code.");
+
+  if (rados->ioctx_create(ctx.data_pool.c_str(), ioCtx))
+    return Status::Invalid("librados::ioctx_create returned non-zero exit code.");
+
+  return Status::OK();
+}
+
+Status RadosConnection::Shutdown() {
+  rados->shutdown();
+  return Status::OK();
+}
+
+}  // namespace connection
+
+/// \brief A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<ScanOptions> options,
+                  std::shared_ptr<Fragment> fragment, FileSource source,
+                  std::shared_ptr<SkyhookDirectObjectAccess> doa, int fragment_format)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        fragment_format_(fragment_format) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    struct stat st {};
+    ARROW_RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    ceph::bufferlist request;
+    ARROW_RETURN_NOT_OK(
+        SerializeScanRequest(options_, fragment_format_, st.st_size, request));
+
+    ceph::bufferlist result;
+    ARROW_RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
+
+    RecordBatchVector batches;
+    ARROW_RETURN_NOT_OK(DeserializeTable(batches, result, !options_->use_threads));

Review comment:
       Why `!options_->use_threads`?

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -185,6 +185,12 @@ inline Result<ScanTaskIterator> GetScanTaskIterator(
   auto fn = [options](std::shared_ptr<Fragment> fragment) -> Result<ScanTaskIterator> {
     ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options));
 
+    if (fragment->type_name() == "skyhook") {

Review comment:
       Is Skyhook actually doing projection?  I think filtering is fine but if projection is done early at the format level then I believe that may cause issues when executing datasets that come from a variety of sources (e.g. unions and joins).

##########
File path: cpp/src/arrow/dataset/file_skyhook.cc
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "arrow/dataset/file_skyhook.h"
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+
+#include <flatbuffers/flatbuffers.h>
+
+#include "generated/ScanRequest_generated.h"
+
+namespace arrow {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+namespace dataset {
+
+namespace connection {
+
+std::mutex connection_mutex;
+
+RadosConnection::~RadosConnection() { Shutdown(); }
+
+Status RadosConnection::Connect() {
+  if (connected) {
+    return Status::OK();
+  }
+
+  // Locks the mutex. Only one thread can pass here at a time.
+  // Another thread handled the connection already.
+  std::unique_lock<std::mutex> lock(connection_mutex);
+  if (connected) {
+    return Status::OK();
+  }
+  connected = true;
+
+  if (rados->init2(ctx.user_name.c_str(), ctx.cluster_name.c_str(), 0))
+    return Status::Invalid("librados::init2 returned non-zero exit code.");
+
+  if (rados->conf_read_file(ctx.ceph_config_path.c_str()))
+    return Status::Invalid("librados::conf_read_file returned non-zero exit code.");
+
+  if (rados->connect())
+    return Status::Invalid("librados::connect returned non-zero exit code.");
+
+  if (rados->ioctx_create(ctx.data_pool.c_str(), ioCtx))
+    return Status::Invalid("librados::ioctx_create returned non-zero exit code.");
+
+  return Status::OK();
+}
+
+Status RadosConnection::Shutdown() {
+  rados->shutdown();
+  return Status::OK();
+}
+
+}  // namespace connection
+
+/// \brief A ScanTask to scan a file fragment in Skyhook format.
+class SkyhookScanTask : public ScanTask {
+ public:
+  SkyhookScanTask(std::shared_ptr<ScanOptions> options,
+                  std::shared_ptr<Fragment> fragment, FileSource source,
+                  std::shared_ptr<SkyhookDirectObjectAccess> doa, int fragment_format)
+      : ScanTask(std::move(options), std::move(fragment)),
+        source_(std::move(source)),
+        doa_(std::move(doa)),
+        fragment_format_(fragment_format) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    struct stat st {};
+    ARROW_RETURN_NOT_OK(doa_->Stat(source_.path(), st));
+
+    ceph::bufferlist request;
+    ARROW_RETURN_NOT_OK(
+        SerializeScanRequest(options_, fragment_format_, st.st_size, request));
+
+    ceph::bufferlist result;
+    ARROW_RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));
+
+    RecordBatchVector batches;
+    ARROW_RETURN_NOT_OK(DeserializeTable(batches, result, !options_->use_threads));
+    return MakeVectorIterator(batches);
+  }
+
+ protected:
+  FileSource source_;
+  std::shared_ptr<SkyhookDirectObjectAccess> doa_;
+  int fragment_format_;
+};
+
+SkyhookFileFormat::SkyhookFileFormat(const std::string& fragment_format,
+                                     const std::string& ceph_config_path,
+                                     const std::string& data_pool,
+                                     const std::string& user_name,
+                                     const std::string& cluster_name,
+                                     const std::string& cls_name)
+    : SkyhookFileFormat(std::make_shared<connection::RadosConnection>(
+          connection::RadosConnection::RadosConnectionCtx(
+              ceph_config_path, data_pool, user_name, cluster_name, cls_name))) {
+  fragment_format_ = fragment_format;
+}
+
+SkyhookFileFormat::SkyhookFileFormat(
+    const std::shared_ptr<connection::RadosConnection>& connection) {
+  connection->Connect();

Review comment:
       I'm not sure what resources are consumed when calling `Connect` but generally I wouldn't expect any connections to be opened just be creating a `SkyhookFileFormat` instance.  It might make more sense to establish the connection on `Inspect` or `ScanFile`.

##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+namespace connection {
+/// \brief An interface for general connections.
+class ARROW_DS_EXPORT Connection {
+ public:
+  virtual Status Connect() = 0;
+
+  Connection() = default;
+  virtual ~Connection() = default;
+};
+
+/// \class RadosConnection
+/// \brief An interface to connect to a Rados cluster and hold the connection
+/// information for usage in later stages.
+class ARROW_DS_EXPORT RadosConnection : public Connection {
+ public:
+  struct RadosConnectionCtx {
+    std::string ceph_config_path;
+    std::string data_pool;
+    std::string user_name;
+    std::string cluster_name;
+    std::string cls_name;
+
+    RadosConnectionCtx(const std::string& ceph_config_path, const std::string& data_pool,
+                       const std::string& user_name, const std::string& cluster_name,
+                       const std::string& cls_name)
+        : ceph_config_path(ceph_config_path),
+          data_pool(data_pool),
+          user_name(user_name),
+          cluster_name(cluster_name),
+          cls_name(cls_name) {}
+  };
+  explicit RadosConnection(const RadosConnectionCtx& ctx)
+      : Connection(),
+        ctx(ctx),
+        rados(new RadosWrapper()),
+        ioCtx(new IoCtxWrapper()),
+        connected(false) {}
+
+  ~RadosConnection();
+
+  /// \brief Connect to the Rados cluster.
+  /// \return Status.
+  Status Connect();
+
+  /// \brief Shutdown the connection to the Rados cluster.
+  /// \return Status.
+  Status Shutdown();
+
+  RadosConnectionCtx ctx;
+  RadosInterface* rados;
+  IoCtxInterface* ioCtx;
+  bool connected;
+};
+
+}  // namespace connection
+
+/// \class SkyhookDirectObjectAccess
+/// \brief Interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and its underlying object.
+class ARROW_DS_EXPORT SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(
+      const std::shared_ptr<connection::RadosConnection>& connection)
+      : connection_(std::move(connection)) {}
+
+  /// \brief Executes the POSIX stat call on a file.
+  /// \param[in] path Path of the file.
+  /// \param[out] st Refernce to the struct object to store the result.
+  /// \return Status.
+  Status Stat(const std::string& path, struct stat& st) {

Review comment:
       Since there are no templates involved I think the implementation here should go in a .cc file (preferred) or these functions will need to be marked inline.  Another advantage of moving the implementation to .cc files is that some of the includes (e.g. sstream) might be able to be removed from the header.

##########
File path: cpp/src/arrow/dataset/file_skyhook_test.cc
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_skyhook.h"
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/test_util.h"
+
+#define ABORT_ON_FAILURE(expr)                     \

Review comment:
       I think we have `ASSERT_OK` (for tests),  `ABORT_NOT_OK` (for scripts and places where error checking isn't allowed) and `DCHECK` (for debug assertions).  I'm not sure we need a new macro.

##########
File path: cpp/src/arrow/dataset/discovery.cc
##########
@@ -270,7 +270,14 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
   for (const auto& info : files_) {
     auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir);
     ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
-    ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition));
+    std::shared_ptr<FileFragment> fragment;
+    if (format_->type_name() == "skyhook") {

Review comment:
       If skyhook has a different implementation it should be handled in the `MakeFragment`'s polymorphism.  This `if` statement shouldn't be needed.

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -199,18 +203,23 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
   const FileSource& source() const { return source_; }
   const std::shared_ptr<FileFormat>& format() const { return format_; }
 
+  const std::shared_ptr<Schema>& dataset_schema() const { return dataset_schema_; }
+
  protected:
   FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                compute::Expression partition_expression,
-               std::shared_ptr<Schema> physical_schema)
+               std::shared_ptr<Schema> physical_schema,
+               std::shared_ptr<Schema> dataset_schema)
       : Fragment(std::move(partition_expression), std::move(physical_schema)),
         source_(std::move(source)),
-        format_(std::move(format)) {}
+        format_(std::move(format)),
+        dataset_schema_(std::move(dataset_schema)) {}
 
   Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
 
   FileSource source_;
   std::shared_ptr<FileFormat> format_;
+  std::shared_ptr<Schema> dataset_schema_;

Review comment:
       As best I can tell the changes in `file_base.h` and `file_base.cc` seem to be around populating this `dataset_schema_` which is later used when creating a scan task.  However, when creating a scan task, the dataset schema should be accessible via the `ScanOptions`.  Can you help me understand why these changes are needed?

##########
File path: cpp/src/arrow/dataset/file_skyhook.h
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+#define _FILE_OFFSET_BITS 64
+
+#pragma once
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/discovery.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/rados.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/filesystem/api.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/io/api.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/macros.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/exception.h"
+
+#define SCAN_ERR_CODE 25
+#define SCAN_ERR_MSG "failed to scan file fragment"
+
+#define SCAN_REQ_DESER_ERR_CODE 26
+#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
+
+#define SCAN_RES_SER_ERR_CODE 27
+#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
+
+namespace arrow {
+namespace dataset {
+
+enum SkyhookFileType { PARQUET, IPC };
+
+/// \addtogroup dataset-file-formats
+///
+/// @{
+
+namespace connection {
+/// \brief An interface for general connections.
+class ARROW_DS_EXPORT Connection {
+ public:
+  virtual Status Connect() = 0;
+
+  Connection() = default;
+  virtual ~Connection() = default;
+};
+
+/// \class RadosConnection
+/// \brief An interface to connect to a Rados cluster and hold the connection
+/// information for usage in later stages.
+class ARROW_DS_EXPORT RadosConnection : public Connection {
+ public:
+  struct RadosConnectionCtx {
+    std::string ceph_config_path;
+    std::string data_pool;
+    std::string user_name;
+    std::string cluster_name;
+    std::string cls_name;
+
+    RadosConnectionCtx(const std::string& ceph_config_path, const std::string& data_pool,
+                       const std::string& user_name, const std::string& cluster_name,
+                       const std::string& cls_name)
+        : ceph_config_path(ceph_config_path),
+          data_pool(data_pool),
+          user_name(user_name),
+          cluster_name(cluster_name),
+          cls_name(cls_name) {}
+  };
+  explicit RadosConnection(const RadosConnectionCtx& ctx)
+      : Connection(),
+        ctx(ctx),
+        rados(new RadosWrapper()),
+        ioCtx(new IoCtxWrapper()),
+        connected(false) {}
+
+  ~RadosConnection();
+
+  /// \brief Connect to the Rados cluster.
+  /// \return Status.
+  Status Connect();
+
+  /// \brief Shutdown the connection to the Rados cluster.
+  /// \return Status.
+  Status Shutdown();
+
+  RadosConnectionCtx ctx;
+  RadosInterface* rados;
+  IoCtxInterface* ioCtx;
+  bool connected;
+};
+
+}  // namespace connection
+
+/// \class SkyhookDirectObjectAccess
+/// \brief Interface for translating the name of a file in CephFS to its
+/// corresponding object ID in RADOS assuming 1:1 mapping between a file
+/// and its underlying object.
+class ARROW_DS_EXPORT SkyhookDirectObjectAccess {
+ public:
+  explicit SkyhookDirectObjectAccess(
+      const std::shared_ptr<connection::RadosConnection>& connection)
+      : connection_(std::move(connection)) {}
+
+  /// \brief Executes the POSIX stat call on a file.
+  /// \param[in] path Path of the file.
+  /// \param[out] st Refernce to the struct object to store the result.
+  /// \return Status.
+  Status Stat(const std::string& path, struct stat& st) {
+    struct stat file_st;
+    if (stat(path.c_str(), &file_st) < 0)
+      return Status::Invalid("stat returned non-zero exit code.");
+    st = file_st;
+    return Status::OK();
+  }
+
+  // Helper function to convert Inode to ObjectID because Rados calls work with
+  // ObjectIDs.
+  std::string ConvertFileInodeToObjectID(uint64_t inode) {
+    std::stringstream ss;
+    ss << std::hex << inode;
+    std::string oid(ss.str() + ".00000000");

Review comment:
       Maybe add a comment or some explanation behind this magic string?

##########
File path: cpp/src/arrow/dataset/rados.cc
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       That could work although it wouldn't be my first conclusion.  Elsewhere the file_xyz files are about bridging the gap between a specific format (which has readers and writers in an `arrow/parquet` or `arrow/csv` or `arrow/ipc` directory and these formats know nothing of datasets) and the generic datasets API (e.g. file_base, dataset.h, etc.  These common files know nothing about any specific format).
   
   So my thinking was more that you might not have enough in just one file to create an `arrow/rados` directory but the format-specific files should go somewhere other than `arrow/dataset`.  At a quick inventory we have:
   
   Types that help when interacting with rados as a client (in rados.cpp/rados.h, these are analogous to our hdfs or s3 client libraries which live in `src/arrow/filesystem`.  I think `RadosConnection` and `SkyhookDirectObjectAccess` belong here as well (this might have been your point).  These types can stand on their own as Ceph integration types even if skyhook doesn't exist.
   
   Types that define the "skyhook protocol" (the flatbuffer files, the global/static methods `SerializeScanRequest`, `DeserializeScanRequest`, `SerializeTable`, and `DeserializeTable`).  Personally, I think these might be best to live in the same directory as the cls.
   
   Types that bridge the dataset API and the skyhook API (this would be `SkyhookFileFormat` and it uses types from both of the above categories).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org