You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/26 22:28:38 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13401: ARROW-16855: [C++] Adding Read Relation ToProto

westonpace commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r956249655


##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -133,5 +136,18 @@ Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                             registry);
 }
 
+Result<std::unique_ptr<substrait::Plan>> PlanToProto(

Review Comment:
   I like the name `PlanToProto` better but, for consistency, I think this should be named `ToProto` right?



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serializes Declaration and produces a substrait::Plan.
+///
+/// Note that, this is a part of roundtripping test API and not
+/// designed to use in production
+/// \param[in] declr the sequence of declarations
+/// \param[in, out] ext_set the extension set to be updated
+/// \param[in] conversion_options the conversion options useful for the serialization

Review Comment:
   ```suggestion
   /// \param[in] conversion_options options to control serialization behavior
   ```



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serializes Declaration and produces a substrait::Plan.
+///
+/// Note that, this is a part of roundtripping test API and not
+/// designed to use in production
+/// \param[in] declr the sequence of declarations

Review Comment:
   ```suggestion
   /// \param[in] declr the sequence of declarations to be serialized
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));

Review Comment:
   It seems like it would be slightly clearer to just do:
   
   ```
   rel->set_allocated_filter(factory_rel.release());
   ```
   
   Then you could get rid of `SetRelation` and skip a long if/else loop.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));

Review Comment:
   I'm not sure this is valid.  Usually you'd want to do something like...
   
   ```
   auto input_rel = internal::make_unique<substrait::Rel>();
   RETURN_NOT_OK(
       SerializeAndCombineRelations(input_decl, ext_set, input_rel, conversion_options));
   rel->set_allocated_input(input_rel.release());
   ```
   
   Sadly, there is no guarantee that `rel` has `input` (e.g. `JoinRel` has `left` and `right`) so I think this input handling will need to be inside the below if/else loop.
   
   Right now I think you are setting the `oneof` inside of `rel` multiple times.



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -202,6 +207,18 @@ Result<std::shared_ptr<Buffer>> SerializeExpression(
     const compute::Expression& expr, ExtensionSet* ext_set,
     const ConversionOptions& conversion_options = {});
 
+/// \brief Serializes an Arrow compute Declaration to a Substrait Relation message
+///
+/// \param[in] declaration the Arrow compute declaration to serialize
+/// \param[in,out] ext_set the extension mapping to use; may be updated to add

Review Comment:
   I'm not sure what `; may be updated to add` means.



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Serializes a Declaration, produce a Substrait Rel and update the global
+/// Substrait plan. A Substrait Rel is passed as a the plan and it is updated with
+/// corresponding Declaration passed for serialization.
+///
+/// Note that this is a rather a helper method useful to fuse a partially serialized
+/// plan with another plan. The reason for having a partially serialized plan is to
+/// avoid unnecessary complication and enable partial plan serialization without
+/// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
+/// upon releasing for the global plan.
+ARROW_ENGINE_EXPORT Status SerializeAndCombineRelations(const compute::Declaration&,
+                                                        ExtensionSet*,
+                                                        std::unique_ptr<substrait::Rel>&,
+                                                        const ConversionOptions&);
+
+/// \brief Serialize a Declaration and produces a Substrait Rel.
+///
+/// Note that in order to provide a generic interface for ToProto for
+/// declaration it is not specialized for each relation within the Substrait Rel.
+/// Rather a serialized relation is set as a member for the Substrait Rel
+/// (partial Relation) which is later on extracted to update a Substrait Rel
+/// which would be included in the fully serialized Acero Exec Plan.
+/// The ExecNode or ExecPlan is not used in this context as Declaration is preferred
+/// in the Substrait space rather than internal components of Acero execution engine.
+ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration&, ExtensionSet*, const ConversionOptions&);
+
+/// \brief Acero to Substrait converter for Acero scan relation.
+ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Rel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>&, const compute::Declaration&, ExtensionSet*,
+    const ConversionOptions&);
+
+/// \brief Acero to Substrait converter for Acero filter relation.
+ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Rel>> FilterRelationConverter(
+    const std::shared_ptr<Schema>&, const compute::Declaration&, ExtensionSet*,
+    const ConversionOptions&);

Review Comment:
   These methods should not be in the `.h` file and should be in an anonymous namespace in the `.cc` file.



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serializes Declaration and produces a substrait::Plan.

Review Comment:
   ```suggestion
   /// \brief Serialize a declaration and into a substrait::Plan.
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -42,9 +55,35 @@ using testing::UnorderedElementsAre;
 namespace arrow {
 
 using internal::checked_cast;
-
+using internal::hash_combine;
 namespace engine {
 
+Status WriteParquetData(const std::string& path,
+                        const std::shared_ptr<fs::FileSystem> file_system,
+                        const std::shared_ptr<Table> input) {
+  EXPECT_OK_AND_ASSIGN(auto buffer_writer, file_system->OpenOutputStream(path));
+  PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*input, arrow::default_memory_pool(),
+                                                  buffer_writer, /*chunk_size*/ 1));
+  return buffer_writer->Close();
+}

Review Comment:
   Substrait supports IPC now.  Can we write this test with IPC?



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serializes Declaration and produces a substrait::Plan.
+///
+/// Note that, this is a part of roundtripping test API and not
+/// designed to use in production
+/// \param[in] declr the sequence of declarations
+/// \param[in, out] ext_set the extension set to be updated
+/// \param[in] conversion_options the conversion options useful for the serialization
+/// \return serialized Acero plan

Review Comment:
   ```suggestion
   /// \return the serialized plan
   ```



##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -133,5 +136,18 @@ Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                             registry);
 }
 
+Result<std::unique_ptr<substrait::Plan>> PlanToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto subs_plan = internal::make_unique<substrait::Plan>();
+  auto plan_rel = internal::make_unique<substrait::PlanRel>();
+  auto rel = internal::make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));

Review Comment:
   Why not use the `ToProto` method you just added to `relation_internal.cc`?



##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -133,5 +136,18 @@ Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                             registry);
 }
 
+Result<std::unique_ptr<substrait::Plan>> PlanToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto subs_plan = internal::make_unique<substrait::Plan>();
+  auto plan_rel = internal::make_unique<substrait::PlanRel>();
+  auto rel = internal::make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  plan_rel->set_allocated_rel(rel.release());

Review Comment:
   This should be `set_allocated_root` I think.



##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -17,6 +17,9 @@
 
 #include "arrow/engine/substrait/plan_internal.h"
 
+#include "arrow/dataset/plan.h"
+#include "arrow/dataset/scanner.h"

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }

Review Comment:
   Why would `SetRelation` return `nullptr` instead of just returning an invalid status?



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }
+  return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
+    const compute::Declaration& declaration, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto declr_input = declaration.inputs[0];
+  // Note that the input is expected in declaration.
+  // ExecNode inputs are not accepted
+  if (util::get_if<compute::ExecNode*>(&declr_input)) {
+    return Status::NotImplemented("Only support Plans written in Declaration format.");
+  }
+  return ToProto(util::get<compute::Declaration>(declr_input), ext_set,
+                 conversion_options);
+}
+
+Result<std::unique_ptr<substrait::Rel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid("Can only convert file system datasets to a Substrait plan.");

Review Comment:
   ```suggestion
       return Status::Invalid("Can only convert scan node with FileSystemDataset to a Substrait plan.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Serializes a Declaration, produce a Substrait Rel and update the global
+/// Substrait plan. A Substrait Rel is passed as a the plan and it is updated with
+/// corresponding Declaration passed for serialization.
+///
+/// Note that this is a rather a helper method useful to fuse a partially serialized
+/// plan with another plan. The reason for having a partially serialized plan is to
+/// avoid unnecessary complication and enable partial plan serialization without
+/// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
+/// upon releasing for the global plan.
+ARROW_ENGINE_EXPORT Status SerializeAndCombineRelations(const compute::Declaration&,
+                                                        ExtensionSet*,
+                                                        std::unique_ptr<substrait::Rel>&,
+                                                        const ConversionOptions&);
+
+/// \brief Serialize a Declaration and produces a Substrait Rel.

Review Comment:
   ```suggestion
   /// \brief Convert an Acero Declaration to a Substrait Rel
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }
+  return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
+    const compute::Declaration& declaration, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto declr_input = declaration.inputs[0];
+  // Note that the input is expected in declaration.
+  // ExecNode inputs are not accepted
+  if (util::get_if<compute::ExecNode*>(&declr_input)) {
+    return Status::NotImplemented("Only support Plans written in Declaration format.");
+  }
+  return ToProto(util::get<compute::Declaration>(declr_input), ext_set,
+                 conversion_options);
+}

Review Comment:
   What is this method doing?  I'm not sure I agree that `declaration.inputs[0]` is always safe.



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -36,6 +36,11 @@
 namespace arrow {
 namespace engine {
 
+ARROW_ENGINE_EXPORT
+Result<std::shared_ptr<Buffer>> SerializePlan(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options = {});
+

Review Comment:
   Needs a doc comment



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Serializes a Declaration, produce a Substrait Rel and update the global
+/// Substrait plan. A Substrait Rel is passed as a the plan and it is updated with
+/// corresponding Declaration passed for serialization.
+///
+/// Note that this is a rather a helper method useful to fuse a partially serialized
+/// plan with another plan. The reason for having a partially serialized plan is to
+/// avoid unnecessary complication and enable partial plan serialization without
+/// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
+/// upon releasing for the global plan.

Review Comment:
   ```suggestion
   /// \brief Convert a Declaration (and its inputs) to a Substrait Rel
   ///
   /// A Substrait Rel is passed as a the plan and it is updated with
   /// corresponding Declaration passed for serialization.
   ///
   /// Note that this is a rather a helper method useful to fuse a partially serialized
   /// plan with another plan. The reason for having a partially serialized plan is to
   /// avoid unnecessary complication and enable partial plan serialization without
   /// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
   /// upon releasing for the global plan.
   ```
   
   If this is a helper method does it need to appear in the `.h` file?



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }
+  return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
+    const compute::Declaration& declaration, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto declr_input = declaration.inputs[0];
+  // Note that the input is expected in declaration.
+  // ExecNode inputs are not accepted
+  if (util::get_if<compute::ExecNode*>(&declr_input)) {
+    return Status::NotImplemented("Only support Plans written in Declaration format.");
+  }
+  return ToProto(util::get<compute::Declaration>(declr_input), ext_set,
+                 conversion_options);
+}
+
+Result<std::unique_ptr<substrait::Rel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid("Can only convert file system datasets to a Substrait plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct,
+                        ToProto(*dataset->schema(), ext_set, conversion_options));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path("file://" + file);
+    // set file format
+    // arrow and feather are temporarily handled via the Parquet format until
+    // upgraded to the latest Substrait version.

Review Comment:
   We have upgraded to the latest Substrait version. You are calling `set_allocated_arrow` and `set_allocated_orc` below.  I'm not sure this comment applies?



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Serializes a Declaration, produce a Substrait Rel and update the global
+/// Substrait plan. A Substrait Rel is passed as a the plan and it is updated with
+/// corresponding Declaration passed for serialization.
+///
+/// Note that this is a rather a helper method useful to fuse a partially serialized
+/// plan with another plan. The reason for having a partially serialized plan is to
+/// avoid unnecessary complication and enable partial plan serialization without
+/// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
+/// upon releasing for the global plan.
+ARROW_ENGINE_EXPORT Status SerializeAndCombineRelations(const compute::Declaration&,
+                                                        ExtensionSet*,
+                                                        std::unique_ptr<substrait::Rel>&,

Review Comment:
   ```suggestion
                                                           std::unique_ptr<substrait::Rel>*,
   ```
   Arrow's style guide forbids the use of mutable references for out-parameters.  You should use a pointer here.



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Serializes a Declaration, produce a Substrait Rel and update the global
+/// Substrait plan. A Substrait Rel is passed as a the plan and it is updated with
+/// corresponding Declaration passed for serialization.
+///
+/// Note that this is a rather a helper method useful to fuse a partially serialized
+/// plan with another plan. The reason for having a partially serialized plan is to
+/// avoid unnecessary complication and enable partial plan serialization without
+/// affecting a global plan. Since kept as unique_ptr resources are relased efficiently
+/// upon releasing for the global plan.
+ARROW_ENGINE_EXPORT Status SerializeAndCombineRelations(const compute::Declaration&,
+                                                        ExtensionSet*,
+                                                        std::unique_ptr<substrait::Rel>&,
+                                                        const ConversionOptions&);
+
+/// \brief Serialize a Declaration and produces a Substrait Rel.
+///
+/// Note that in order to provide a generic interface for ToProto for
+/// declaration it is not specialized for each relation within the Substrait Rel.
+/// Rather a serialized relation is set as a member for the Substrait Rel
+/// (partial Relation) which is later on extracted to update a Substrait Rel
+/// which would be included in the fully serialized Acero Exec Plan.
+/// The ExecNode or ExecPlan is not used in this context as Declaration is preferred
+/// in the Substrait space rather than internal components of Acero execution engine.

Review Comment:
   I'm not sure I understand this paragraph



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,46 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief A function to extract Acero Declaration from a Substrait Rel object

Review Comment:
   ```suggestion
   /// \brief Convert a Substrait Rel object to an Acero declaration
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set,
+                                                             conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }
+  return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
+    const compute::Declaration& declaration, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto declr_input = declaration.inputs[0];
+  // Note that the input is expected in declaration.
+  // ExecNode inputs are not accepted
+  if (util::get_if<compute::ExecNode*>(&declr_input)) {
+    return Status::NotImplemented("Only support Plans written in Declaration format.");
+  }
+  return ToProto(util::get<compute::Declaration>(declr_input), ext_set,
+                 conversion_options);
+}
+
+Result<std::unique_ptr<substrait::Rel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid("Can only convert file system datasets to a Substrait plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct,
+                        ToProto(*dataset->schema(), ext_set, conversion_options));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path("file://" + file);

Review Comment:
   We should probably add a method in `uri.h` to create a file URI from a path.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1731,5 +1808,265 @@ TEST(Substrait, AggregateWithFilter) {
   ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return kNullConsumer; }));
 }
 
+TEST(Substrait, BasicPlanRoundTripping) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToString();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)

Review Comment:
   Can you give an example of what the path looks like before you do the replace?  I'm not sure I follow.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -29,10 +29,20 @@
 #include "arrow/filesystem/localfs.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/filesystem/util_internal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
 
 namespace arrow {
+
+using internal::checked_cast;
+using internal::make_unique;
+
 namespace engine {
 
+using SubstraitConverter = std::function<Result<std::unique_ptr<substrait::Rel>>(
+    const std::shared_ptr<Schema>&, const compute::Declaration&, ExtensionSet*,
+    const ConversionOptions&)>;
+

Review Comment:
   I think you can get rid of this now that we aren't doing a registry approach?



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -202,6 +207,18 @@ Result<std::shared_ptr<Buffer>> SerializeExpression(
     const compute::Expression& expr, ExtensionSet* ext_set,
     const ConversionOptions& conversion_options = {});
 
+/// \brief Serializes an Arrow compute Declaration to a Substrait Relation message
+///
+/// \param[in] declaration the Arrow compute declaration to serialize
+/// \param[in,out] ext_set the extension mapping to use; may be updated to add
+/// \param[in] conversion_options options to control how the conversion is done
+/// mappings for the components in the used declaration
+/// \return a buffer containing the protobuf serialization of the corresponding Substrait
+/// relation message

Review Comment:
   ```suggestion
   /// \brief Serialize an Acero Declaration to a binary protobuf Substrait message
   ///
   /// \param[in] declaration the Acero declaration to serialize
   /// \param[in,out] ext_set the extension mapping to use; may be updated to add
   /// \param[in] conversion_options options to control how the conversion is done
   ///
   /// \return a buffer containing the protobuf serialization of the Acero relation
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org