You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/12 12:46:51 UTC

[GitHub] [arrow] rtpsw opened a new pull request, #14385: ARROW-17980: [C++] As-of-Join Substrait extension

rtpsw opened a new pull request, #14385:
URL: https://github.com/apache/arrow/pull/14385

   See https://issues.apache.org/jira/browse/ARROW-17980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994890093


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   I am confused - 
   https://github.com/apache/arrow/blob/ec579df631deaa8f6186208ed2a4ebec00581dfa/cpp/src/arrow/compute/exec/options.h#L411
   
   Looks like "by" is one per table and "on" is only one for all the tables. So shouldn't substrait message contains multiple entries for "by" key?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r997055795


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {

Review Comment:
   Can you add some documentation for the message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1283962508

   > > > @rtpsw Looks pretty good! There seems to be many style changes that seems orthogonal. Are those changes automatically made? Is it easy to separate out the style change from the actual change?
   > > 
   > > 
   > > If we insist, we could take these style changes out to a separate PR, get that merged, and then come back here to rebase.
   > 
   > Anja suggested changing `arrow::engine::substrait` to `arrow::engine::substrait_ext` to avoid MSVC name conflicts with symbols in `::substrait`. This should allow us to keep using `substrait::` instead of `::substrait::` and avoid a large orthogonal change.
   
   Does `arrow::engine::substrait::extension` work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994952758


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   I see - this makes sense now.
   
   Unfortunately the limitation is too strict I think (requiring all input tables to have the same column index for the by key) 
   The restriction for the on-key is OK - we have restrictions for "time" to be the first column. However, we don't have restriction on the "by-key" column - it can have different column index for different input tables.
   
   I think we need to fix this problem in order to Asof Join to be useful - Since we are deciding the shape of the substrait message now - I think it makes sense to fix both message format and asof join options format in this PR.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994877834


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -951,28 +953,27 @@ class AsofJoinNode : public ExecNode {
   }
 
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
-      const std::vector<ExecNode*>& inputs,
+      const std::vector<std::shared_ptr<Schema>> input_schema,
       const std::vector<col_index_t>& indices_of_on_key,
       const std::vector<std::vector<col_index_t>>& indices_of_by_key) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
 
-    size_t n_by = indices_of_by_key[0].size();
+    size_t n_by = indices_of_by_key.size() == 0 ? 0 : indices_of_by_key[0].size();

Review Comment:
   Yes, this happens (and I observed it before adding the code change here) when the by-key is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1281039384

   > @rtpsw Looks pretty good! There seems to be many style changes that seems orthogonal. Are those changes automatically made? Is it easy to separate out the style change from the actual change?
   
   If we insist, we could take these style changes out to a separate PR, get that merged, and then come back here to rebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1277281595

   @westonpace, any idea why some jobs, like [this one](https://github.com/apache/arrow/actions/runs/3240443758/jobs/5311073777), failed? It seems to be complaining about missing proto classes; I don't know why the proto classes would get built in some jobs and not in others.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276180575

   Note that this PR includes code from https://github.com/apache/arrow/pull/14386 which is currently pending.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994876578


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  .substrait.Expression on = 1;

Review Comment:
   > We are using StructField for this so I don't think we can use name for this, right?
   
   IIRC, Arrow Substrait currently only supports [`StructField` message](https://github.com/substrait-io/substrait/blob/f3f6bdc947e689e800279666ff33f118e42d2146/proto/substrait/algebra.proto#L856) but Substrait itself also provides a [`MapKey` message](https://github.com/substrait-io/substrait/blob/f3f6bdc947e689e800279666ff33f118e42d2146/proto/substrait/algebra.proto#L855) that can be used to reference by name.
   
   > I think it probably makes more sense to have this only be a repeated field in case that on column doesn't share the same column index for each table
   
   If we go down this path (see also [my comment here](https://github.com/apache/arrow/pull/14385#discussion_r994865351)), I think we'd also need to repeat the by-key, i.e., to wrap its existing repetition as a message and repeat that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r995581529


##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <iostream>
+
+#include "arrow/engine/substrait/options.h"
+
+#include <google/protobuf/util/json_util.h>
+#include "arrow/compute/exec/asof_join_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/engine/substrait/expression_internal.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "substrait/extension_rels.pb.h"
+
+namespace arrow {
+namespace engine {
+
+class DefaultExtensionProvider : public ExtensionProvider {
+ public:
+  Result<DeclarationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+                                  const google::protobuf::Any& rel,
+                                  const ExtensionSet& ext_set) override {
+    if (rel.Is<arrow::substrait::AsOfJoinRel>()) {
+      arrow::substrait::AsOfJoinRel as_of_join_rel;
+      rel.UnpackTo(&as_of_join_rel);
+      return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+    }
+    return Status::NotImplemented("Unrecognized extension in Susbstrait plan: ",
+                                  rel.DebugString());
+  }
+
+ private:
+  Result<DeclarationInfo> MakeAsOfJoinRel(
+      const std::vector<DeclarationInfo>& inputs,
+      const arrow::substrait::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
+    if (inputs.size() < 2) {
+      return Status::Invalid("substrait::AsOfJoinNode too few input tables: ",
+                             inputs.size());
+    }
+    // on-key
+    if (!as_of_join_rel.has_on()) {
+      return Status::Invalid("substrait::AsOfJoinNode missing on-key");
+    }
+    ARROW_ASSIGN_OR_RAISE(auto on_key_expr, FromProto(as_of_join_rel.on(), ext_set, {}));
+    if (on_key_expr.field_ref() == NULLPTR) {
+      return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref on-key");
+    }
+    const FieldRef& on_key = *on_key_expr.field_ref();
+
+    // by-key
+    std::vector<FieldRef> by_key;

Review Comment:
   As discussed, the vector refers to columns, so it is a single "by" key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1283967621

   > Does `arrow::engine::substrait::extension` work?
   
   My guess is this would cause MSVC just the same because when it sees a `substrait::*` symbol it would look under `arrow::engine::substrait` and won't fall back to `::substrait`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994865351


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));

Review Comment:
   Note that the code here already appeared in the pre-PR code, just moved a bit.
   
   I suspect going down the path you describe, which I'm willing to try, would be more complicated than you suggest because even `AsofJoinNodeOptions` does not currently support distinct field-references per input (be them index- or name- based). Therefore, I think the required fix would involve many places in the code (and we would normally have allocated a separate task for such a fix).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276213240

   > @rtpsw Why do we need #14386 for this? Can we separate out these two changes?
   
   Technically, we don't, or no longer do. I used #14386 during test cases development of this PR. If #14386 will be rejected, we could remove its code here too. Otherwise, the code can stay here and will merge cleanly with #14386.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276209689

   @westonpace The changes here depends on your local change in https://github.com/westonpace/arrow/tree/experiment/substrait-extension
   
   How do you want to proceed here? We can either 
   (1) merge changes in experiment/substrait-extension and rebase this PR
   (2) combine changes in experiment/substrait-extension and the asof join message together in this PR and review together
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1277725398

   @rtpsw I took one around of review. At the high level I think this makes lot of sense. Left some comments for refinement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r997105107


##########
cpp/src/arrow/engine/substrait/expression_internal.cc:
##########
@@ -52,16 +52,16 @@ Id NormalizeFunctionName(Id id) {
 
 }  // namespace
 
-Status DecodeArg(const substrait::FunctionArgument& arg, uint32_t idx,
+Status DecodeArg(const ::substrait::FunctionArgument& arg, uint32_t idx,

Review Comment:
   Unfortunately, no. I observed many CI jobs failures without these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r997101243


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  repeated AsOfJoinKeys input_keys = 1;

Review Comment:
   I think `keys` is a little concise/betters here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276133886

   cc @icexelloss @westonpace 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994883955


##########
cpp/src/arrow/engine/substrait/options.h:
##########
@@ -32,7 +36,7 @@ namespace engine {
 /// How strictly to adhere to the input structure when converting between Substrait and
 /// Acero representations of a plan. This allows the user to trade conversion accuracy
 /// for performance and lenience.
-enum class ConversionStrictness {
+enum class ARROW_ENGINE_EXPORT ConversionStrictness {

Review Comment:
   It's a [visibility macro](https://github.com/apache/arrow/pull/14385/files#diff-e803a80147d4db13f4a3eee8126ce922ca91a6c6c2295ca3c60d292514c66d1fR22). It tells the linker to export the symbols for the construct following it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r996269166


##########
cpp/src/arrow/engine/substrait/options.h:
##########
@@ -65,16 +69,27 @@ using NamedTableProvider =
     std::function<Result<compute::Declaration>(const std::vector<std::string>&)>;
 static NamedTableProvider kDefaultNamedTableProvider;
 
+class ARROW_ENGINE_EXPORT ExtensionProvider {
+ public:
+  static std::shared_ptr<ExtensionProvider> kDefaultExtensionProvider;

Review Comment:
   Not sure, but I think it's fine to leave as is in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1288843658

   The commit history got messed up. I'll try to open a fresh PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994578624


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  .substrait.Expression on = 1;

Review Comment:
   The proto requires a common expression, which could be a name rather than an index. OTOH, the Arrow Substrait code in this PR only deals with a field index expression; this can be extended, if desired.



##########
cpp/src/arrow/compare.cc:
##########
@@ -305,6 +305,11 @@ class RangeDataEqualsImpl {
   Status Visit(const StructType& type) {
     const int32_t num_fields = type.num_fields();
 
+    if (left_.child_data.size() != static_cast<size_t>(num_fields) ||

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994897214


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   No, both are common to all tables. The "by" field is a vector corresponding to columns, i.e., one can select multiple columns using this vector into the "by" key. If we want per-input field-references, we'd need a vector "on" and a vector-of-vector "by".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276205243

   @rtpsw Why do we need #14386 for this? Can we separate out these two changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r993453755


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [
+                {
+                  "selection": {
+                    "directReference": {
+                      "structField": {

Review Comment:
   It means access to a field by index. It leads Arrow Substrait to create a `FieldRef` with the field index given inside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994590711


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));

Review Comment:
   The proto uses `Expression` to allow for various ways to specify the key; it can be thought of as future-proof. We can choose which of these ways to support in Arrow Substrait in a first version. The code here supports any kind of `FieldRef`, be it named or indexed, that Arrow Substrait may set up. The Arrow Substrait code only supports an indexed `FieldRef` (see [here](https://github.com/apache/arrow/blob/e8d54ea765ae7ba63b8f42c29ec855d656e85dc8/cpp/src/arrow/engine/substrait/expression_internal.cc#L150)), but it can be extended.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994575138


##########
cpp/src/arrow/compare.cc:
##########
@@ -305,6 +305,11 @@ class RangeDataEqualsImpl {
   Status Visit(const StructType& type) {
     const int32_t num_fields = type.num_fields();
 
+    if (left_.child_data.size() != static_cast<size_t>(num_fields) ||

Review Comment:
   Can we please separate out orthogonal changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994952758


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   I see - this makes sense now.
   
   Unfortunately the limitation is too strict I think (requiring all input tables to have the same column index for the by key) 
   The restriction for the on-key is OK - we force "time" to be the first column already. However, we don't have restriction on the "by-key" column - it can have different column index for different input tables.
   
   I think we need to fix this problem in order to Asof Join to be useful - Since we are deciding the shape of the substrait message now - I think it makes sense to fix both message format and asof join options format in this PR.
   



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   I see - this makes sense now.
   
   Unfortunately the limitation is too strict I think (requiring all input tables to have the same column index for the by key) 
   The restriction for the on-key is OK - we force "time" to be the first column already. However, we don't have restriction on the "by-key" column - it can have different column index for different input tables.
   
   I think we need to fix this problem in order for Asof Join to be useful - Since we are deciding the shape of the substrait message now - I think it makes sense to fix both message format and asof join options format in this PR.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994580310


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -951,28 +953,27 @@ class AsofJoinNode : public ExecNode {
   }
 
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
-      const std::vector<ExecNode*>& inputs,
+      const std::vector<std::shared_ptr<Schema>> input_schema,

Review Comment:
   This allows exposing `MakeOutputSchema` to a caller that has a schema rather than an `ExecNode`, as is done in the tester.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw closed pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw closed pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension
URL: https://github.com/apache/arrow/pull/14385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994736304


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   Shouldn't this be a list of field refs - one for each table in the join?



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -951,28 +953,27 @@ class AsofJoinNode : public ExecNode {
   }
 
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
-      const std::vector<ExecNode*>& inputs,
+      const std::vector<std::shared_ptr<Schema>> input_schema,

Review Comment:
   kk sounds reasonable 



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -951,28 +953,27 @@ class AsofJoinNode : public ExecNode {
   }
 
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
-      const std::vector<ExecNode*>& inputs,
+      const std::vector<std::shared_ptr<Schema>> input_schema,
       const std::vector<col_index_t>& indices_of_on_key,
       const std::vector<std::vector<col_index_t>>& indices_of_by_key) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
 
-    size_t n_by = indices_of_by_key[0].size();
+    size_t n_by = indices_of_by_key.size() == 0 ? 0 : indices_of_by_key[0].size();

Review Comment:
   When can `indices_of_by_key.size() == 0` means this is a join without `by` key?



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));

Review Comment:
   I am not sure we need this function - in practice this substrait field ref should always be index based..



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));

Review Comment:
   I would prefer if we ever support named-based field ref in substrait (which I doubt we will ever do), then let's add this function then



##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  .substrait.Expression on = 1;

Review Comment:
   We are using `StructField` for this so I don't think we can use name for this, right?
   
   I think it probably makes more sense to have this only be a repeated field in case that `on` column doesn't share the same column index for each table



##########
cpp/src/arrow/engine/substrait/options.h:
##########
@@ -32,7 +36,7 @@ namespace engine {
 /// How strictly to adhere to the input structure when converting between Substrait and
 /// Acero representations of a plan. This allows the user to trade conversion accuracy
 /// for performance and lenience.
-enum class ConversionStrictness {
+enum class ARROW_ENGINE_EXPORT ConversionStrictness {

Review Comment:
   What does the macro `ARROW_ENGINE_EXPORT` do?



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <iostream>
+
+#include "arrow/engine/substrait/options.h"
+
+#include <google/protobuf/util/json_util.h>
+#include "arrow/compute/exec/asof_join_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/engine/substrait/expression_internal.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "substrait/extension_rels.pb.h"
+
+namespace arrow {
+namespace engine {
+
+class DefaultExtensionProvider : public ExtensionProvider {
+ public:
+  Result<DeclarationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+                                  const google::protobuf::Any& rel,
+                                  const ExtensionSet& ext_set) override {
+    if (rel.Is<arrow::substrait::AsOfJoinRel>()) {
+      arrow::substrait::AsOfJoinRel as_of_join_rel;
+      rel.UnpackTo(&as_of_join_rel);
+      return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+    }
+    return Status::NotImplemented("Unrecognized extension in Susbstrait plan: ",
+                                  rel.DebugString());
+  }
+
+ private:
+  Result<DeclarationInfo> MakeAsOfJoinRel(
+      const std::vector<DeclarationInfo>& inputs,
+      const arrow::substrait::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
+    if (inputs.size() < 2) {
+      return Status::Invalid("substrait::AsOfJoinNode too few input tables: ",
+                             inputs.size());
+    }
+    // on-key
+    if (!as_of_join_rel.has_on()) {
+      return Status::Invalid("substrait::AsOfJoinNode missing on-key");
+    }
+    ARROW_ASSIGN_OR_RAISE(auto on_key_expr, FromProto(as_of_join_rel.on(), ext_set, {}));
+    if (on_key_expr.field_ref() == NULLPTR) {
+      return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref on-key");
+    }
+    const FieldRef& on_key = *on_key_expr.field_ref();
+
+    // by-key
+    std::vector<FieldRef> by_key;

Review Comment:
   I think `by_keys` here is a little clear since this is a vector 



##########
cpp/src/arrow/engine/substrait/options.h:
##########
@@ -65,16 +69,27 @@ using NamedTableProvider =
     std::function<Result<compute::Declaration>(const std::vector<std::string>&)>;
 static NamedTableProvider kDefaultNamedTableProvider;
 
+class ARROW_ENGINE_EXPORT ExtensionProvider {
+ public:
+  static std::shared_ptr<ExtensionProvider> kDefaultExtensionProvider;

Review Comment:
   Should we define this the same way that `kDefaultNamedTableProvider` is defined? I don't see a reason why those two should be different



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));
+    }
+    return indices_of_on_key;
+  }
+
+  static Result<std::vector<std::vector<col_index_t>>> GetIndicesOfByKey(

Review Comment:
   Same comment as `GetIndicesOfByKey`



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <iostream>
+
+#include "arrow/engine/substrait/options.h"
+
+#include <google/protobuf/util/json_util.h>
+#include "arrow/compute/exec/asof_join_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/engine/substrait/expression_internal.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "substrait/extension_rels.pb.h"
+
+namespace arrow {
+namespace engine {
+
+class DefaultExtensionProvider : public ExtensionProvider {
+ public:
+  Result<DeclarationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+                                  const google::protobuf::Any& rel,
+                                  const ExtensionSet& ext_set) override {
+    if (rel.Is<arrow::substrait::AsOfJoinRel>()) {
+      arrow::substrait::AsOfJoinRel as_of_join_rel;
+      rel.UnpackTo(&as_of_join_rel);
+      return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+    }
+    return Status::NotImplemented("Unrecognized extension in Susbstrait plan: ",
+                                  rel.DebugString());
+  }
+
+ private:
+  Result<DeclarationInfo> MakeAsOfJoinRel(
+      const std::vector<DeclarationInfo>& inputs,
+      const arrow::substrait::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
+    if (inputs.size() < 2) {
+      return Status::Invalid("substrait::AsOfJoinNode too few input tables: ",
+                             inputs.size());
+    }
+    // on-key
+    if (!as_of_join_rel.has_on()) {
+      return Status::Invalid("substrait::AsOfJoinNode missing on-key");
+    }
+    ARROW_ASSIGN_OR_RAISE(auto on_key_expr, FromProto(as_of_join_rel.on(), ext_set, {}));
+    if (on_key_expr.field_ref() == NULLPTR) {
+      return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref on-key");
+    }
+    const FieldRef& on_key = *on_key_expr.field_ref();
+
+    // by-key
+    std::vector<FieldRef> by_key;
+    for (const auto& by_item : as_of_join_rel.by()) {
+      ARROW_ASSIGN_OR_RAISE(auto by_key_expr, FromProto(by_item, ext_set, {}));
+      if (by_key_expr.field_ref() == NULLPTR) {
+        return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref by-key");
+      }
+      by_key.push_back(*by_key_expr.field_ref());
+    }
+
+    // schema
+    int64_t tolerance = as_of_join_rel.tolerance();
+    std::vector<std::shared_ptr<Schema>> input_schema(inputs.size());
+    for (size_t i = 0; i < inputs.size(); i++) {
+      input_schema[i] = inputs[i].output_schema;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        auto schema, compute::asofjoin::MakeOutputSchema(input_schema, on_key, by_key));
+    compute::AsofJoinNodeOptions asofjoin_node_opts{std::move(on_key), std::move(by_key),
+                                                    tolerance};
+
+    // declaration
+    std::vector<compute::Declaration::Input> input_decls(inputs.size());
+    for (size_t i = 0; i < inputs.size(); i++) {
+      input_decls[i] = inputs[i].declaration;
+    }
+    return DeclarationInfo{
+        compute::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
+        std::move(schema)};
+  }
+};
+
+std::shared_ptr<ExtensionProvider> ExtensionProvider::kDefaultExtensionProvider =

Review Comment:
   Nvm this, I see the declaration there in options.h



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <iostream>
+
+#include "arrow/engine/substrait/options.h"
+
+#include <google/protobuf/util/json_util.h>
+#include "arrow/compute/exec/asof_join_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/engine/substrait/expression_internal.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "substrait/extension_rels.pb.h"
+
+namespace arrow {
+namespace engine {
+
+class DefaultExtensionProvider : public ExtensionProvider {
+ public:
+  Result<DeclarationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+                                  const google::protobuf::Any& rel,
+                                  const ExtensionSet& ext_set) override {
+    if (rel.Is<arrow::substrait::AsOfJoinRel>()) {
+      arrow::substrait::AsOfJoinRel as_of_join_rel;
+      rel.UnpackTo(&as_of_join_rel);
+      return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+    }
+    return Status::NotImplemented("Unrecognized extension in Susbstrait plan: ",
+                                  rel.DebugString());
+  }
+
+ private:
+  Result<DeclarationInfo> MakeAsOfJoinRel(
+      const std::vector<DeclarationInfo>& inputs,
+      const arrow::substrait::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
+    if (inputs.size() < 2) {
+      return Status::Invalid("substrait::AsOfJoinNode too few input tables: ",
+                             inputs.size());
+    }
+    // on-key
+    if (!as_of_join_rel.has_on()) {
+      return Status::Invalid("substrait::AsOfJoinNode missing on-key");
+    }
+    ARROW_ASSIGN_OR_RAISE(auto on_key_expr, FromProto(as_of_join_rel.on(), ext_set, {}));
+    if (on_key_expr.field_ref() == NULLPTR) {
+      return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref on-key");
+    }
+    const FieldRef& on_key = *on_key_expr.field_ref();
+
+    // by-key
+    std::vector<FieldRef> by_key;
+    for (const auto& by_item : as_of_join_rel.by()) {
+      ARROW_ASSIGN_OR_RAISE(auto by_key_expr, FromProto(by_item, ext_set, {}));
+      if (by_key_expr.field_ref() == NULLPTR) {
+        return Status::NotImplemented("substrait::AsOfJoinNode non-field-ref by-key");
+      }
+      by_key.push_back(*by_key_expr.field_ref());
+    }
+
+    // schema
+    int64_t tolerance = as_of_join_rel.tolerance();
+    std::vector<std::shared_ptr<Schema>> input_schema(inputs.size());
+    for (size_t i = 0; i < inputs.size(); i++) {
+      input_schema[i] = inputs[i].output_schema;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        auto schema, compute::asofjoin::MakeOutputSchema(input_schema, on_key, by_key));
+    compute::AsofJoinNodeOptions asofjoin_node_opts{std::move(on_key), std::move(by_key),
+                                                    tolerance};
+
+    // declaration
+    std::vector<compute::Declaration::Input> input_decls(inputs.size());
+    for (size_t i = 0; i < inputs.size(); i++) {
+      input_decls[i] = inputs[i].declaration;
+    }
+    return DeclarationInfo{
+        compute::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
+        std::move(schema)};
+  }
+};
+
+std::shared_ptr<ExtensionProvider> ExtensionProvider::kDefaultExtensionProvider =

Review Comment:
   Should we move this to substrait/options.h file? (Since the `kDefaultNamedTableProvider` is also there)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994574143


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  .substrait.Expression on = 1;

Review Comment:
   This requires all the `on` column must have the same index for all tables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994575755


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -951,28 +953,27 @@ class AsofJoinNode : public ExecNode {
   }
 
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
-      const std::vector<ExecNode*>& inputs,
+      const std::vector<std::shared_ptr<Schema>> input_schema,

Review Comment:
   Why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276225114

   > @westonpace The changes here depends on your local change in https://github.com/westonpace/arrow/tree/experiment/substrait-extension
   
   The code here uses most of Weston's code, except for mostly that the "dummy" `DelayRel` was replaced with `AsOfJoinRel` (including corresponding test code) and that the code was refactored a bit. So, I think the code here should go in without a merge or a rebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994881827


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));
+    }
+    return indices_of_on_key;
+  }
+
+  static Result<std::vector<std::vector<col_index_t>>> GetIndicesOfByKey(

Review Comment:
   Sorry, which comment? You're referring to `GetIndicesOfByKey` but it's here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1279696843

   CI failures look unrelated the changes here. @icexelloss, this may be ready to go, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1280914327

   @rtpsw Looks pretty good! There seems to be many style changes that seems orthogonal. Are those changes automatically made? Is it easy to separate out the style change from the actual change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r993443410


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [
+                {
+                  "selection": {
+                    "directReference": {
+                      "structField": {

Review Comment:
   What does the "structField" mean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994578624


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  .substrait.Expression on = 1;

Review Comment:
   The proto requires a common expression, which could be a name rather than an index. OTOH, the current Arrow Substrait code in this PR only deals with a field index expression; this can be extended, if desired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994577692


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1030,6 +1031,32 @@ class AsofJoinNode : public ExecNode {
     return match.indices()[0];
   }
 
+  static Result<std::vector<col_index_t>> GetIndicesOfOnKey(
+      const std::vector<std::shared_ptr<Schema>>& input_schema, const FieldRef& on_key) {
+    size_t n_input = input_schema.size();
+    std::vector<col_index_t> indices_of_on_key(n_input);
+    for (size_t i = 0; i < n_input; ++i) {
+      ARROW_ASSIGN_OR_RAISE(indices_of_on_key[i],
+                            FindColIndex(*input_schema[i], on_key, "on"));

Review Comment:
   I thought substrait/FieldRef is index based already - why do we need to find index here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1276336725

   https://issues.apache.org/jira/browse/ARROW-17980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r994885711


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -3187,5 +3198,164 @@ TEST(Substrait, IsthmusPlan) {
                        *compute::default_exec_context(), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 3]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait.AsOfJoinRel",
+              "on": {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 0,
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              "by": [

Review Comment:
   As noted in [this post](https://github.com/apache/arrow/pull/14385#discussion_r994865351), `AsOfJoinNode` code currently does not support distinct per-input references.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r997096806


##########
cpp/src/arrow/engine/substrait/expression_internal.cc:
##########
@@ -52,16 +52,16 @@ Id NormalizeFunctionName(Id id) {
 
 }  // namespace
 
-Status DecodeArg(const substrait::FunctionArgument& arg, uint32_t idx,
+Status DecodeArg(const ::substrait::FunctionArgument& arg, uint32_t idx,

Review Comment:
   Hmm..Are these style change orthogonal? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1283762465

   > > @rtpsw Looks pretty good! There seems to be many style changes that seems orthogonal. Are those changes automatically made? Is it easy to separate out the style change from the actual change?
   > 
   > If we insist, we could take these style changes out to a separate PR, get that merged, and then come back here to rebase.
   
   Anja suggested changing `arrow::engine::substrait` to `arrow::engine::substrait_ext` to avoid MSVC name conflicts with symbols in `::substrait`. This should allow us to keep using `substrait::` instead of `::substrait::` and avoid a large orthogonal change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
icexelloss commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1288966778

   @rtpsw You don't need to open a new PR - you can just force push to your branch ARROW-17980 and fix the commit history


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #14385:
URL: https://github.com/apache/arrow/pull/14385#issuecomment-1288086193

   > @rtpsw Looks pretty good! There seems to be many style changes that seems orthogonal. Are those changes automatically made? Is it easy to separate out the style change from the actual change?
   
   @icexelloss, I managed to minimize the orthogonal change (`namespace substrait = ::substrait` added to a few files). I think this should be enough to make reviewing convenient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #14385: ARROW-17980: [C++] As-of-Join Substrait extension

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #14385:
URL: https://github.com/apache/arrow/pull/14385#discussion_r1002701934


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {

Review Comment:
   Done.



##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
+
+import "substrait/algebra.proto";
+
+option csharp_namespace = "Arrow.Substrait";
+option go_package = "github.com/apache/arrow/substrait";
+option java_multiple_files = true;
+option java_package = "io.arrow.substrait";
+
+message AsOfJoinRel {
+  repeated AsOfJoinKeys input_keys = 1;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org