You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/18 22:54:38 UTC

[GitHub] [arrow] Dandandan opened a new pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Dandandan opened a new pull request #8965:
URL: https://github.com/apache/arrow/pull/8965


   This PR changes to not build an index for the probe side of the join. 
   
   This has big impact on join performance, e.g. query 12 has a >3x speedup.
   
   FYI @andygrove @jorgecarleitao  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748366419


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=h1) Report
   > Merging [#8965](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=desc) (5159ef2) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **increase** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8965/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #8965   +/-   ##
   =======================================
     Coverage   83.25%   83.26%           
   =======================================
     Files         196      196           
     Lines       48116    48196   +80     
   =======================================
   + Hits        40059    40129   +70     
   - Misses       8057     8067   +10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.19% <100.00%> (+0.09%)` | :arrow_up: |
   | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.00% <0.00%> (-0.56%)` | :arrow_down: |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.31% <0.00%> (-0.50%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/arrow/src/array/array\_binary.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfYmluYXJ5LnJz) | `90.73% <0.00%> (+0.21%)` | :arrow_up: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `90.19% <0.00%> (+0.26%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `95.62% <0.00%> (+0.30%)` | :arrow_up: |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `91.25% <0.00%> (+0.66%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.82% <0.00%> (+0.77%)` | :arrow_up: |
   | [rust/datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BhcnF1ZXQucnM=) | `79.56% <0.00%> (+0.91%)` | :arrow_up: |
   | ... and [1 more](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=footer). Last update [d65ba4e...5159ef2](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748366419


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=h1) Report
   > Merging [#8965](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=desc) (dff61d1) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **increase** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8965/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #8965   +/-   ##
   =======================================
     Coverage   83.25%   83.25%           
   =======================================
     Files         196      196           
     Lines       48116    48195   +79     
   =======================================
   + Hits        40059    40127   +68     
   - Misses       8057     8068   +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.16% <100.00%> (+0.07%)` | :arrow_up: |
   | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.00% <0.00%> (-0.56%)` | :arrow_down: |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.31% <0.00%> (-0.50%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/arrow/src/array/array\_binary.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfYmluYXJ5LnJz) | `90.73% <0.00%> (+0.21%)` | :arrow_up: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `90.19% <0.00%> (+0.26%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `95.62% <0.00%> (+0.30%)` | :arrow_up: |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `91.25% <0.00%> (+0.66%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.82% <0.00%> (+0.77%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=footer). Last update [d65ba4e...9ed27d5](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748359398


   https://issues.apache.org/jira/browse/ARROW-10968


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546142043



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       Isn't this wrong already? Shouldn't it visit all batches before adding nulls for the left side?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546209575



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       But I think this should be resolved in another PR. I think best would to create/keep a bitmap for each index on the left.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748366419


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=h1) Report
   > Merging [#8965](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=desc) (9ed27d5) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **increase** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8965/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #8965   +/-   ##
   =======================================
     Coverage   83.25%   83.25%           
   =======================================
     Files         196      196           
     Lines       48116    48195   +79     
   =======================================
   + Hits        40059    40127   +68     
   - Misses       8057     8068   +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.16% <100.00%> (+0.07%)` | :arrow_up: |
   | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.00% <0.00%> (-0.56%)` | :arrow_down: |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.31% <0.00%> (-0.50%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/arrow/src/array/array\_binary.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfYmluYXJ5LnJz) | `90.73% <0.00%> (+0.21%)` | :arrow_up: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `90.19% <0.00%> (+0.26%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `95.62% <0.00%> (+0.30%)` | :arrow_up: |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `91.25% <0.00%> (+0.66%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.82% <0.00%> (+0.77%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=footer). Last update [d65ba4e...9ed27d5](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748366419


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=h1) Report
   > Merging [#8965](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=desc) (7c03853) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **increase** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8965/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #8965   +/-   ##
   =======================================
     Coverage   83.25%   83.26%           
   =======================================
     Files         196      196           
     Lines       48116    48196   +80     
   =======================================
   + Hits        40059    40128   +69     
   - Misses       8057     8068   +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.19% <100.00%> (+0.09%)` | :arrow_up: |
   | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.00% <0.00%> (-0.56%)` | :arrow_down: |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.31% <0.00%> (-0.50%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/arrow/src/array/array\_binary.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfYmluYXJ5LnJz) | `90.73% <0.00%> (+0.21%)` | :arrow_up: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `90.19% <0.00%> (+0.26%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `95.62% <0.00%> (+0.30%)` | :arrow_up: |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `91.25% <0.00%> (+0.66%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.82% <0.00%> (+0.77%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=footer). Last update [d65ba4e...5159ef2](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546230466



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       Opened https://issues.apache.org/jira/browse/ARROW-10971 for this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546209575



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       But I think this should be resolved in another PR. I think best would to create/keep a bitmap for each index on the left during the join.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#issuecomment-748366419


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=h1) Report
   > Merging [#8965](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=desc) (1985647) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **increase** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8965/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #8965   +/-   ##
   =======================================
     Coverage   83.25%   83.25%           
   =======================================
     Files         196      196           
     Lines       48116    48195   +79     
   =======================================
   + Hits        40059    40127   +68     
   - Misses       8057     8068   +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.16% <100.00%> (+0.07%)` | :arrow_up: |
   | [rust/parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJheV9yZWFkZXIucnM=) | `77.00% <0.00%> (-0.56%)` | :arrow_down: |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `91.31% <0.00%> (-0.50%)` | :arrow_down: |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/arrow/src/array/array\_binary.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfYmluYXJ5LnJz) | `90.73% <0.00%> (+0.21%)` | :arrow_up: |
   | [rust/parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9zY2hlbWEvdHlwZXMucnM=) | `90.19% <0.00%> (+0.26%)` | :arrow_up: |
   | [rust/datafusion/src/datasource/parquet.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9kYXRhc291cmNlL3BhcnF1ZXQucnM=) | `95.62% <0.00%> (+0.30%)` | :arrow_up: |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `91.25% <0.00%> (+0.66%)` | :arrow_up: |
   | [rust/parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL21ldGFkYXRhLnJz) | `91.82% <0.00%> (+0.77%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow/pull/8965/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=footer). Last update [d65ba4e...9ed27d5](https://codecov.io/gh/apache/arrow/pull/8965?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8965:
URL: https://github.com/apache/arrow/pull/8965


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546142043



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       Isn't this wrong already? Shouldn't it visit all batches before adding nulls for the left side that had no matches at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8965: ARROW-10968: [Rust][DataFusion] Don't build hash table for right side of join

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8965:
URL: https://github.com/apache/arrow/pull/8965#discussion_r546142043



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -423,71 +419,87 @@ fn build_batch(
 // (1, 0)     (1, 2)
 fn build_join_indexes(
     left: &JoinHashMap,
-    right: &JoinHashMap,
+    right: &RecordBatch,
     join_type: &JoinType,
+    on: &HashSet<String>,
 ) -> Result<Vec<(JoinIndex, JoinIndex)>> {
+    let keys_values = on
+        .iter()
+        .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
+        .collect::<Result<Vec<_>>>()?;
+
+    let mut key = Vec::with_capacity(keys_values.len());
+
     match join_type {
         JoinType::Inner => {
-            // inner => key intersection
-            // unfortunately rust does not support intersection of map keys :(
-            let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
-            let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
-            let inner = left_set.intersection(&left_right);
-
             let mut indexes = Vec::new(); // unknown a prior size
-            for key in inner {
-                // the unwrap never happens by construction of the key
-                let left_indexes = left.get(key).unwrap();
-                let right_indexes = right.get(key).unwrap();
+
+            // Visit all of the right rows
+            for row in 0..right.num_rows() {
+                // Get the key and find it in the build index
+                create_key(&keys_values, row, &mut key)?;
+                let left_indexes = left.get(&key);
 
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.iter().for_each(|x| {
-                    right_indexes.iter().for_each(|y| {
-                        // on an inner join, left and right indices are present
-                        indexes.push((Some(*x), Some(*y)));
-                    })
+                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
+                    // on an inner join, left and right indices are present
+                    indexes.push((Some(*x), Some((0, row))));
                 })
             }
             Ok(indexes)
         }
         JoinType::Left => {
-            // left => left keys
             let mut indexes = Vec::new(); // unknown a prior size
-            for (key, left_indexes) in left {
-                // for every item on the left and right with this key, add the respective pair
-                if let Some(right_indexes) = right.get(key) {
-                    left_indexes.iter().for_each(|x| {
-                        right_indexes.iter().for_each(|y| {
-                            // on an inner join, left and right indices are present
-                            indexes.push((Some(*x), Some(*y)));
+
+            // Keep track of which item is visited in the build input
+            // TODO: this can be stored more efficiently with a marker
+            let mut is_visited = HashSet::new();
+
+            // First visit all of the rows
+            for row in 0..right.num_rows() {
+                create_key(&keys_values, row, &mut key)?;
+                // the unwrap never happens by construction of the key
+                let left_indexes = left.get(&key);
+
+                match left_indexes {
+                    Some(indices) => {
+                        is_visited.insert(key.clone());
+
+                        indices.iter().for_each(|x| {
+                            indexes.push((Some(*x), Some((0, row))));
                         })
-                    })
-                } else {
-                    // key not on the right => push Nones
-                    left_indexes.iter().for_each(|x| {

Review comment:
       Isn't this wrong already? Shouldn't it visit all right batches before adding nulls for the left side that had no matches at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org