You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/03/21 05:07:16 UTC

[GitHub] [arrow] westonpace opened a new pull request, #34651: GH-32763: [C++] Add FromProto for fetch & sort

westonpace opened a new pull request, #34651:
URL: https://github.com/apache/arrow/pull/34651

   This does not support the clustered sort direction, custom sort functions, or complex (non-reference) expressions as sort keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] danepitkin commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "danepitkin (via GitHub)" <gi...@apache.org>.
danepitkin commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1546091338

   With Arrow v12 released (🎉 ) you should be all clear to merge at your leisure!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1502355005

   Rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1167300235


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {
+            return Status::NotImplemented(
+                "substrait::SortRel with ordering with mixed null placement");
+          }
+        }
+        if (sort.sort_kind_case() != substrait::SortField::SortKindCase::kDirection) {
+          return Status::NotImplemented("substrait::SortRel with custom sort function");

Review Comment:
   At the moment we are constructing substrait "by hand" so it's not too hard to come up with the test but it is a maintenance and readability burden.  I don't think we've been super consistent.  I skipped this one just because I think it'll be a while until we have producers that come up with this kind of plan.  Right now I'm trying to balance "tons of hard coded JSON plans" against "full coverage of possible plans".  Once the text format is ready enough that we can switch to it I think that will help some.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1509329073

   I'll merge this when green but I don't think there is any significant rush to get this into 12.0.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1477292126

   * Closes: #32763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162242746


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;

Review Comment:
   should we reserve here?



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;

Review Comment:
   should we `reserve` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1548413981

   Also, I see I missed some feedback from @vibhatha so I will try and get to that today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198949600


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -5145,6 +5145,128 @@ TEST(Substrait, CompoundEmitWithFilter) {
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }
 
+TEST(Substrait, SortAndFetch) {
+  // Sort by A, ascending, take items [2, 5), then sort by B descending
+  std::string substrait_json = R"({
+    "version": {
+        "major_number": 9999,
+        "minor_number": 9999,
+        "patch_number": 9999
+    },
+    "relations": [
+        {
+            "rel": {
+                "sort": {
+                    "input": {
+                        "fetch": {
+                            "input": {
+                                "sort": {
+                                    "input": {
+                                        "read": {
+                                            "base_schema": {
+                                                "names": [
+                                                    "A",
+                                                    "B"
+                                                ],
+                                                "struct": {
+                                                    "types": [
+                                                        {
+                                                            "i32": {}
+                                                        },
+                                                        {
+                                                            "i32": {}
+                                                        }
+                                                    ]
+                                                }
+                                            },
+                                            "namedTable": {
+                                                "names": [
+                                                    "table"
+                                                ]
+                                            }
+                                        }
+                                    },
+                                    "sorts": [
+                                        {
+                                            "expr": {
+                                                "selection": {
+                                                    "directReference": {
+                                                        "structField": {
+                                                            "field": 0
+                                                        }
+                                                    },
+                                                    "rootReference": {}
+                                                }
+                                            },
+                                            "direction": "SORT_DIRECTION_ASC_NULLS_FIRST"
+                                        }
+                                    ]
+                                }
+                            },
+                            "offset": 2,
+                            "count": 3
+                        }
+                    },
+                    "sorts": [
+                        {
+                            "expr": {
+                                "selection": {
+                                    "directReference": {
+                                        "structField": {
+                                            "field": 1
+                                        }
+                                    },
+                                    "rootReference": {}
+                                }
+                            },
+                            "direction": "SORT_DIRECTION_DESC_NULLS_LAST"
+                        }
+                    ]
+                }
+            }
+        }
+    ],
+    "extension_uris": [],
+    "extensions": []
+})";
+
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json));
+  auto test_schema = schema({field("A", int32()), field("B", int32())});
+
+  auto input_table = TableFromJSON(test_schema, {R"([
+      [null, null],
+      [5, 8],
+      [null, null],
+      [null, null],
+      [3, 4],
+      [9, 6],
+      [4, 5]
+  ])"});
+
+  // First sort by A, ascending, nulls first to yield rows:
+  // 0, 2, 3, 4, 6, 1, 5
+  // Apply fetch to grab rows 3, 4, 6
+  // Then sort by B, descending, to yield rows 6, 4, 3
+
+  auto output_table = TableFromJSON(test_schema, {R"([
+    [4, 5],
+    [3, 4],
+    [null, null]
+  ])"});
+
+  NamedTableProvider table_provider = [&](const std::vector<std::string>& names,
+                                          const Schema&) {
+    std::shared_ptr<acero::ExecNodeOptions> options =
+        std::make_shared<acero::TableSourceNodeOptions>(input_table);
+    return acero::Declaration("table_source", {}, options, "mock_source");
+  };

Review Comment:
   Good cleanup, thanks.  I've switched to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198951183


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -329,6 +329,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162872803


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {
+            return Status::NotImplemented(
+                "substrait::SortRel with ordering with mixed null placement");
+          }
+        }
+        if (sort.sort_kind_case() != substrait::SortField::SortKindCase::kDirection) {
+          return Status::NotImplemented("substrait::SortRel with custom sort function");

Review Comment:
   This is more of a question: Do we usually write test cases for these "unsupported case"? I know these can be quite annoying to test (since no easy way to construct the substrait input) so just curious about the common practice. 



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {
+            return Status::NotImplemented(
+                "substrait::SortRel with ordering with mixed null placement");
+          }
+        }
+        if (sort.sort_kind_case() != substrait::SortField::SortKindCase::kDirection) {
+          return Status::NotImplemented("substrait::SortRel with custom sort function");

Review Comment:
   This is more of a question: Do we usually write test cases for these "unsupported case"? I know these can be quite annoying to test (since no easy way to construct the substrait input) so just curious about the current practice. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162872803


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {
+            return Status::NotImplemented(
+                "substrait::SortRel with ordering with mixed null placement");
+          }
+        }
+        if (sort.sort_kind_case() != substrait::SortField::SortKindCase::kDirection) {
+          return Status::NotImplemented("substrait::SortRel with custom sort function");

Review Comment:
   This is more of a question: Do we usually write test cases for these "unsupported case"? I know these can be quite annoying to test (since need to use json) so just curious about the common practice. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162245382


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -5145,6 +5145,128 @@ TEST(Substrait, CompoundEmitWithFilter) {
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }
 
+TEST(Substrait, SortAndFetch) {
+  // Sort by A, ascending, take items [2, 5), then sort by B descending
+  std::string substrait_json = R"({
+    "version": {
+        "major_number": 9999,
+        "minor_number": 9999,
+        "patch_number": 9999
+    },
+    "relations": [
+        {
+            "rel": {
+                "sort": {
+                    "input": {
+                        "fetch": {
+                            "input": {
+                                "sort": {
+                                    "input": {
+                                        "read": {
+                                            "base_schema": {
+                                                "names": [
+                                                    "A",
+                                                    "B"
+                                                ],
+                                                "struct": {
+                                                    "types": [
+                                                        {
+                                                            "i32": {}
+                                                        },
+                                                        {
+                                                            "i32": {}
+                                                        }
+                                                    ]
+                                                }
+                                            },
+                                            "namedTable": {
+                                                "names": [
+                                                    "table"
+                                                ]
+                                            }
+                                        }
+                                    },
+                                    "sorts": [
+                                        {
+                                            "expr": {
+                                                "selection": {
+                                                    "directReference": {
+                                                        "structField": {
+                                                            "field": 0
+                                                        }
+                                                    },
+                                                    "rootReference": {}
+                                                }
+                                            },
+                                            "direction": "SORT_DIRECTION_ASC_NULLS_FIRST"
+                                        }
+                                    ]
+                                }
+                            },
+                            "offset": 2,
+                            "count": 3
+                        }
+                    },
+                    "sorts": [
+                        {
+                            "expr": {
+                                "selection": {
+                                    "directReference": {
+                                        "structField": {
+                                            "field": 1
+                                        }
+                                    },
+                                    "rootReference": {}
+                                }
+                            },
+                            "direction": "SORT_DIRECTION_DESC_NULLS_LAST"
+                        }
+                    ]
+                }
+            }
+        }
+    ],
+    "extension_uris": [],
+    "extensions": []
+})";
+
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json));
+  auto test_schema = schema({field("A", int32()), field("B", int32())});
+
+  auto input_table = TableFromJSON(test_schema, {R"([
+      [null, null],
+      [5, 8],
+      [null, null],
+      [null, null],
+      [3, 4],
+      [9, 6],
+      [4, 5]
+  ])"});
+
+  // First sort by A, ascending, nulls first to yield rows:
+  // 0, 2, 3, 4, 6, 1, 5
+  // Apply fetch to grab rows 3, 4, 6
+  // Then sort by B, descending, to yield rows 6, 4, 3
+
+  auto output_table = TableFromJSON(test_schema, {R"([
+    [4, 5],
+    [3, 4],
+    [null, null]
+  ])"});
+
+  NamedTableProvider table_provider = [&](const std::vector<std::string>& names,
+                                          const Schema&) {
+    std::shared_ptr<acero::ExecNodeOptions> options =
+        std::make_shared<acero::TableSourceNodeOptions>(input_table);
+    return acero::Declaration("table_source", {}, options, "mock_source");
+  };

Review Comment:
   Instead could we use the following?
   
   ```suggestion
     NamedTableProvider table_provider = AlwaysProvideSameTable(std::move(input_table));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162241777


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;

Review Comment:
   This is efficient, but shall we leave a comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198951614


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -714,6 +731,96 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
 
       return ProcessEmit(join, join_declaration, join_schema);
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+          first = false;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {

Review Comment:
   I've cleaned this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1194256801


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {
+    return compute::SortOrder::Ascending;

Review Comment:
   Hmm, technically unspecified would be an invalid plan I think.  So it would probably be better to reject.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1548411626

   I've rebased just in case and will let CI run one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1195279530


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -714,6 +731,96 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
 
       return ProcessEmit(join, join_declaration, join_schema);
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {
+          null_placement = IsSortNullsFirst(sort.direction())
+                               ? compute::NullPlacement::AtStart
+                               : compute::NullPlacement::AtEnd;
+          first = false;
+        } else {
+          if ((null_placement == compute::NullPlacement::AtStart &&
+               !IsSortNullsFirst(sort.direction())) ||
+              (null_placement == compute::NullPlacement::AtEnd &&
+               IsSortNullsFirst(sort.direction()))) {

Review Comment:
   You can change `IsSortNullsFirst` to return a `compute::NullPlacement` directly and this will make these lines a bit simpler (just compare the old null placement with the new one).



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;

Review Comment:
   It wouldn't hurt certainly!



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -329,6 +329,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {

Review Comment:
   Same comment here.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -329,6 +329,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;

Review Comment:
   Hmm, rather than arithmetic on enum values, I'd rather see a proper `switch/case` statement to make code more readable and maintainable. This is not so performance-sensitive that it must be optimized to the latest nanosecond.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1480154009

   CC @vibhatha / @rtpsw mind taking a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1477292163

   :warning: GitHub issue #32763 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou merged pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou merged PR #34651:
URL: https://github.com/apache/arrow/pull/34651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198950813


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -329,6 +329,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;

Review Comment:
   I've switched to proper handling of these cases and I return NullPlacement and SortOrder.  I agree the code is more readable now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "amol- (via GitHub)" <gi...@apache.org>.
amol- commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1497230282

   @vibhatha @rtpsw are you able to take a look as suggest by @westonpace ?
   If this one is mostly complete and only needs rebasing, it would be great to get it reviewed and ship before v12 is released.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1502545427

   @amol- @westonpace I will take a look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162242002


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;

Review Comment:
   But `CLUSTERED` has no null preference though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1167299286


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {

Review Comment:
   Good catch.  This was a bug.  I've added a test case to make sure we are correctly detecting and rejecting mixed null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1502355948

   I think @rtpsw is out this week but maybe @icexelloss can take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162867342


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;
+      compute::NullPlacement null_placement;
+      bool first = true;
+      for (const auto& sort : sort.sorts()) {
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED) {
+          return Status::Invalid(
+              "substrait::SortRel with sort that had unspecified direction");
+        }
+        if (sort.direction() == substrait::SortField::SortDirection::
+                                    SortField_SortDirection_SORT_DIRECTION_CLUSTERED) {
+          return Status::NotImplemented(
+              "substrait::SortRel with sort with clustered sort direction");
+        }
+        // Substrait allows null placement to differ for each field.  Acero expects it to
+        // be consistent across all fields.  So we grab the null placement from the first
+        // key and verify all other keys have the same null placement
+        if (first) {

Review Comment:
   Looks like `first` was initialized to `true` but I don't see any code changing its value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "vibhatha (via GitHub)" <gi...@apache.org>.
vibhatha commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1162239721


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {
+    return compute::SortOrder::Ascending;

Review Comment:
   Do we consider `UNSPECIFIED` as Ascending? 
   
   Here: 
   
   ```proto
    enum SortDirection {
       SORT_DIRECTION_UNSPECIFIED = 0;
       SORT_DIRECTION_ASC_NULLS_FIRST = 1;
       SORT_DIRECTION_ASC_NULLS_LAST = 2;
       SORT_DIRECTION_DESC_NULLS_FIRST = 3;
       SORT_DIRECTION_DESC_NULLS_LAST = 4;
       SORT_DIRECTION_CLUSTERED = 5;
     }
   ```
   
   I assume since the 1, 2 are Ascending, `<3` is to pick the first three as Ascending. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198949157


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;

Review Comment:
   We now throw an error on `CLUSTERED`:
   
   ```
         case substrait::SortField::SortDirection::
             SortField_SortDirection_SORT_DIRECTION_CLUSTERED:
         default:
           return Status::NotImplemented(
               "Acero does not support the specified sort direction: ", dir);
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -757,6 +774,95 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
       return ProcessEmit(std::move(join), std::move(join_declaration),
                          std::move(join_schema));
     }
+    case substrait::Rel::RelTypeCase::kFetch: {
+      const auto& fetch = rel.fetch();
+      RETURN_NOT_OK(CheckRelCommon(fetch, conversion_options));
+
+      if (!fetch.has_input()) {
+        return Status::Invalid("substrait::FetchRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(fetch.input(), ext_set, conversion_options));
+
+      int64_t offset = fetch.offset();
+      int64_t count = fetch.count();
+
+      acero::Declaration fetch_dec{
+          "fetch", {input.declaration}, acero::FetchNodeOptions(offset, count)};
+
+      DeclarationInfo fetch_declaration{std::move(fetch_dec), input.output_schema};
+      return ProcessEmit(fetch, std::move(fetch_declaration),
+                         fetch_declaration.output_schema);
+    }
+    case substrait::Rel::RelTypeCase::kSort: {
+      const auto& sort = rel.sort();
+      RETURN_NOT_OK(CheckRelCommon(sort, conversion_options));
+
+      if (!sort.has_input()) {
+        return Status::Invalid("substrait::SortRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input,
+                            FromProto(sort.input(), ext_set, conversion_options));
+
+      if (sort.sorts_size() == 0) {
+        return Status::Invalid("substrait::SortRel with no sorts");
+      }
+
+      std::vector<compute::SortKey> sort_keys;

Review Comment:
   I've added the `reserve`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198947322


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {
+    return compute::SortOrder::Ascending;

Review Comment:
   This is now an explicit rejection.
   
   ```
         case substrait::SortField::SortDirection::
             SortField_SortDirection_SORT_DIRECTION_UNSPECIFIED:
           return Status::Invalid("The substrait plan does not specify a sort direction");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34651:
URL: https://github.com/apache/arrow/pull/34651#discussion_r1198947322


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -367,6 +367,23 @@ ARROW_ENGINE_EXPORT Result<DeclarationInfo> MakeAggregateDeclaration(
 
 }  // namespace internal
 
+namespace {
+
+bool IsSortNullsFirst(const substrait::SortField::SortDirection& direction) {
+  return direction % 2 == 1;
+}
+
+compute::SortOrder SortOrderFromDirection(
+    const substrait::SortField::SortDirection& direction) {
+  if (direction < 3) {
+    return compute::SortOrder::Ascending;

Review Comment:
   This is now an explicit rejection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #34651: GH-32763: [C++] Add FromProto for fetch & sort

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #34651:
URL: https://github.com/apache/arrow/pull/34651#issuecomment-1569364824

   Benchmark runs are scheduled for baseline = 2a6848c82cfd141ab6ed07a29d24be32b0dede09 and contender = fbe0d5f155b200d0ba97640f0c7682ce89470558. fbe0d5f155b200d0ba97640f0c7682ce89470558 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/17b4a87d9e8b4e18b1c5ae6ecf830631...ab127cf7c191483d9ba89fda37fc3950/)
   [Finished :arrow_down:0.18% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8e41472ebe194ee9ac6cc3f1822c1005...544d0342f91d42f990ecc1f09d0d482a/)
   [Finished :arrow_down:0.98% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/52d7564b17664a8a9cc3ce37dc4ff13e...dab8f8deb9ee4c169abd4c5bd8ecb969/)
   [Finished :arrow_down:0.51% :arrow_up:0.09%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/9d0f51fa86c64cf885afdfbb1c6f7c2e...b7c68c421dc24ff6a39833a7610d85d3/)
   Buildkite builds:
   [Finished] [`fbe0d5f1` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2909)
   [Finished] [`fbe0d5f1` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2945)
   [Finished] [`fbe0d5f1` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2910)
   [Finished] [`fbe0d5f1` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2935)
   [Finished] [`2a6848c8` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2908)
   [Finished] [`2a6848c8` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2944)
   [Finished] [`2a6848c8` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2909)
   [Finished] [`2a6848c8` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2934)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org