You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/01 12:41:23 UTC

[GitHub] [arrow] Dandandan opened a new pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concat to single batch

Dandandan opened a new pull request #9070:
URL: https://github.com/apache/arrow/pull/9070


   This is based on a earlier PR and is meant to help discussing a solution for https://issues.apache.org/jira/browse/ARROW-11030
   
   The benchmark results look very promising, mostly removing the overhead of small bathes. For a very small batch size of 64, on tcph q 12:
   
   ```
   Query 12 iteration 0 took 1726.5 ms
   Query 12 iteration 1 took 1722.5 ms
   Query 12 iteration 2 took 1781.3 ms
   Query 12 iteration 3 took 1830.6 ms
   ```
   
   Master
   
   ```
   Query 12 iteration 0 took 11925.1 ms
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (6d182c9) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50179      +10     
   ==========================================
   + Hits        41436    41443       +7     
   - Misses       8733     8736       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.16% <85.33%> (-0.17%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...6d182c9](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551052647



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       That would be a bit less trivial to get rid of than I thought, but at some point you could make a case either to convert those string arrays to use 64 bit indexes or to split the batches to have 2^32 size, or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concat to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753312905


   https://issues.apache.org/jira/browse/ARROW-11030


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753330627


   I'm seeing a 2x improvement in performance for q12 @ SF=100 with batch size 4096 :rocket: 
   
   This should allow us to reduce the default batch size and reduce memory pressure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551043968



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       Yes, but as far as I can tell only limits the max nr of elements to 2 ^ 32 for each batch / array on the right?
   I think the string *data* per right record batch itself should be able to hold more than 4GB because the string offsets are stored separately, or am I wrong about that?
   If this becomes a problem in the future, we can change it to use `u64` for the right indices as well, I think without too much extra cost besides double allocation/memory usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #9070:
URL: https://github.com/apache/arrow/pull/9070


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550959182



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -479,37 +490,40 @@ fn build_batch(
 fn build_join_indexes(
     left: &JoinHashMap,
     right: &RecordBatch,
-    join_type: &JoinType,
+    join_type: JoinType,
     right_on: &HashSet<String>,
-) -> Result<Vec<(JoinIndex, RightIndex)>> {
+) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = right_on
         .iter()
         .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
         .collect::<Result<Vec<_>>>()?;
 
     let mut key = Vec::with_capacity(keys_values.len());
 
+    let mut left_indices = UInt64Builder::new(0);

Review comment:
       Seems better for inner join to *not* use builder / allocate bitmap for nulls? What is the current most ergonomic way to do this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (bc5bdfa) into [master](https://codecov.io/gh/apache/arrow/commit/4b7cdcb9220b6d94b251aef32c21ef9b4097ecfa?el=desc) (4b7cdcb) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.18%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.61%   82.61%   -0.01%     
   ==========================================
     Files         203      203              
     Lines       50140    50152      +12     
   ==========================================
   + Hits        41422    41431       +9     
   - Misses       8718     8721       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `89.04% <92.06%> (-0.49%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [4b7cdcb...bc5bdfa](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-754694469


   @Dandandan Yes, I personally think this is fine for now (hence the approval) and since no-one has objected I think we can go ahead and merge this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550961536



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -297,69 +340,28 @@ impl RecordBatchStream for HashJoinStream {
 /// *
 fn build_batch_from_indices(
     schema: &Schema,
-    left: &Vec<RecordBatch>,
+    left: &RecordBatch,
     right: &RecordBatch,
-    join_type: &JoinType,
-    indices: &[(JoinIndex, RightIndex)],
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    column_indices: &Vec<ColumnIndex>,
 ) -> ArrowResult<RecordBatch> {
-    if left.is_empty() {
-        todo!("Create empty record batch");
-    }
-
-    let (primary_is_left, primary_schema, secondary_schema) = match join_type {
-        JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()),
-        JoinType::Right => (false, right.schema(), left[0].schema()),
-    };
-
     // build the columns of the new [RecordBatch]:
     // 1. pick whether the column is from the left or right
-    // 2. based on the pick, `take` items from the different recordBatches
+    // 2. based on the pick, `take` items from the different RecordBatches
     let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
 
-    let right_indices: UInt32Array =
-        indices.iter().map(|(_, join_index)| join_index).collect();
-
-    for field in schema.fields() {
-        // pick the column (left or right) based on the field name.
-        let (is_primary, column_index) = match primary_schema.index_of(field.name()) {
-            Ok(i) => Ok((true, i)),
-            Err(_) => {
-                match secondary_schema.index_of(field.name()) {
-                    Ok(i) => Ok((false, i)),
-                    _ => Err(DataFusionError::Internal(
-                        format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
-                    ))
-                }
-            }
-        }.map_err(DataFusionError::into_arrow_external_error)?;
-
-        let is_left =
-            (is_primary && primary_is_left) || (!is_primary && !primary_is_left);
-
-        let array = if is_left {
-            // Note that we take `.data_ref()` to gather the [ArrayData] of each array.
-            let arrays = left
-                .iter()
-                .map(|batch| batch.column(column_index).data_ref().as_ref())
-                .collect::<Vec<_>>();
-
-            let mut mutable = MutableArrayData::new(arrays, true, indices.len());
-            // use the left indices
-            for (join_index, _) in indices {
-                match join_index {
-                    Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
-                    None => mutable.extend_nulls(1),
-                }
-            }
-            make_array(Arc::new(mutable.freeze()))
+    for column_index in column_indices {
+        let array = if column_index.is_left {
+            let array = left.column(column_index.index);
+            compute::take(array.as_ref(), &left_indices, None)?
         } else {
-            // use the right indices
-            let array = right.column(column_index);
+            let array = right.column(column_index.index);
             compute::take(array.as_ref(), &right_indices, None)?
         };
         columns.push(array);
     }
-    Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
+    RecordBatch::try_new(Arc::new(schema.clone()), columns)
 }
 
 /// Create a key `Vec<u8>` that is used as key for the hashmap

Review comment:
       https://issues.apache.org/jira/browse/ARROW-11112




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753668963


   Yes @jorgecarleitao reducing the number of batches makes sense here. I am just slightly concerned that always reducing to a single batch may be introducing a constraint that could bite us in the future, although this is realistically only an issue when using 32-bit offsets for variable-width arrays.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551049086



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       What I meant is that the types of the index used in the hash join has no effect of the type being used in the string array offsets. So the offsets values inside the string might be using 64-bit values, while here we use 32-bit to index into the string elements, which would allow more than 4GB? Anyway, at some point there is a limit we should keep in mind 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551043968



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       Yes, but as far as I can tell only limits the max nr of elements to 2 ^ 32 for each batch on the right?
   I think the string *data* per right record batch itself should be able to hold more than 4GB because the string offsets are stored separately, or am I wrong about that?
   If this becomes a problem in the future, we can change it to use `u64` for the right indices as well, I think without too much extra cost besides double allocation/memory usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551052647



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       That would be a less trivial to get rid of than I thought, but at some point you could make a case either to convert those string batches to use 64 bit indexes or to split the batches to have 2^32 size, or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551052266



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       Ah I see what you mean now...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753668963


   Yes @jorgecarleitao reducing the number of batches makes sense here. I am just slightly concerned that always reducing to a single batch may be introducing a constraint that could bit us in the future, although this is realistically only an issue when using 32-bit offsets for variable-width arrays.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-754573137


   @jorgecarleitao @andygrove 
   Do you think this design is good enough for now?
   For now it means anyone storing > 4GB variable size data in a build-side column should opt to use 64-bit offsets, and in the future one option would be to automatically detect this and apply a `cast` when needed. I think that should have the least performance implications.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550959638



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -119,6 +125,36 @@ impl HashJoinExec {
             build_side: Arc::new(Mutex::new(None)),
         })
     }
+
+    /// Calculates column indices and left/right placement on input / output schemas and jointype
+    fn column_indices_from_schema(&self) -> ArrowResult<Vec<ColumnIndex>> {

Review comment:
       This is now calculated upfront




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753574211


   @andygrove @jorgecarleitao 
   
   Is now ready for review. 
   I added some comments myself too.
   
   The PR now includes the following:
   
   * The main change to collect into 1 batch, and use `take` directly
   * Build info about column indices upfront
   * Speed improvement in `create_key` (doing `array.value(i)` twice most importantly), this also makes q1 and q5 run a bit faster
   * Misc refactoring
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550960259



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -297,69 +340,28 @@ impl RecordBatchStream for HashJoinStream {
 /// *
 fn build_batch_from_indices(
     schema: &Schema,
-    left: &Vec<RecordBatch>,
+    left: &RecordBatch,
     right: &RecordBatch,
-    join_type: &JoinType,
-    indices: &[(JoinIndex, RightIndex)],
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    column_indices: &Vec<ColumnIndex>,
 ) -> ArrowResult<RecordBatch> {
-    if left.is_empty() {
-        todo!("Create empty record batch");
-    }
-
-    let (primary_is_left, primary_schema, secondary_schema) = match join_type {
-        JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()),
-        JoinType::Right => (false, right.schema(), left[0].schema()),
-    };
-
     // build the columns of the new [RecordBatch]:
     // 1. pick whether the column is from the left or right
-    // 2. based on the pick, `take` items from the different recordBatches
+    // 2. based on the pick, `take` items from the different RecordBatches
     let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
 
-    let right_indices: UInt32Array =
-        indices.iter().map(|(_, join_index)| join_index).collect();
-
-    for field in schema.fields() {
-        // pick the column (left or right) based on the field name.
-        let (is_primary, column_index) = match primary_schema.index_of(field.name()) {
-            Ok(i) => Ok((true, i)),
-            Err(_) => {
-                match secondary_schema.index_of(field.name()) {
-                    Ok(i) => Ok((false, i)),
-                    _ => Err(DataFusionError::Internal(
-                        format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
-                    ))
-                }
-            }
-        }.map_err(DataFusionError::into_arrow_external_error)?;
-
-        let is_left =
-            (is_primary && primary_is_left) || (!is_primary && !primary_is_left);
-
-        let array = if is_left {
-            // Note that we take `.data_ref()` to gather the [ArrayData] of each array.
-            let arrays = left
-                .iter()
-                .map(|batch| batch.column(column_index).data_ref().as_ref())
-                .collect::<Vec<_>>();
-
-            let mut mutable = MutableArrayData::new(arrays, true, indices.len());
-            // use the left indices
-            for (join_index, _) in indices {
-                match join_index {
-                    Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
-                    None => mutable.extend_nulls(1),
-                }
-            }
-            make_array(Arc::new(mutable.freeze()))
+    for column_index in column_indices {
+        let array = if column_index.is_left {
+            let array = left.column(column_index.index);
+            compute::take(array.as_ref(), &left_indices, None)?
         } else {
-            // use the right indices
-            let array = right.column(column_index);
+            let array = right.column(column_index.index);
             compute::take(array.as_ref(), &right_indices, None)?
         };
         columns.push(array);
     }
-    Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
+    RecordBatch::try_new(Arc::new(schema.clone()), columns)
 }
 
 /// Create a key `Vec<u8>` that is used as key for the hashmap

Review comment:
       Currently looks like creating this key / hashing is most expensive part of the queries.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551052983



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       Thanks for the explanation @andygrove 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concat to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313633


   FYI @jorgecarleitao @andygrove 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753662117


   @andygrove , isn't this going in the direction that we were discussing in https://issues.apache.org/jira/browse/ARROW-11058 wrt to the current lack of benefits in having multiple batches (though you just mentioned a benefit: indexing size)?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-755244535


   FYI I am planning to on working on a the first changes https://issues.apache.org/jira/browse/ARROW-11112 basef on this PR/branch later this week, to start getting rid of the hash key creation and start prepare the algorithm to use a vectorized implementation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551041712



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       This change does mean that we can only use this join operator when the data can fit into a single batch. For example, if we are using 32-bit offsets then this limits us to 4GB of string data in a single column.
   
   I don't know if we have any users that care about this scenario or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concat to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (54b9c3a) into [master](https://codecov.io/gh/apache/arrow/commit/cc0ee5efcf9f6a67bcc407a11e8553a0409275c1?el=desc) (cc0ee5e) will **increase** coverage by `0.00%`.
   > The diff coverage is `88.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #9070   +/-   ##
   =======================================
     Coverage   82.55%   82.55%           
   =======================================
     Files         203      203           
     Lines       50043    50055   +12     
   =======================================
   + Hits        41313    41323   +10     
   - Misses       8730     8732    +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `89.71% <88.23%> (-0.23%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [cc0ee5e...54b9c3a](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (2418113) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `84.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.58%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50181      +12     
   ==========================================
   + Hits        41436    41444       +8     
   - Misses       8733     8737       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `85.97% <83.78%> (-0.36%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...6d182c9](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551050010



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       Also FYI, for the left side we are now using 64 bit indices, so the 2^ 32 limit currently should be *only* on the number of elements per right batch, which seems reasonable to me (and easy to get rid of that constraint). Also, on the other hand, a future way to save some memory could be to choose for 32 bit indices for the left side whenever there are less than 2 ^ 32 items in the build side.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (c96ba9f) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50177       +8     
   ==========================================
   + Hits        41436    41442       +6     
   - Misses       8733     8735       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.08% <85.52%> (-0.24%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...c96ba9f](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (797ffb6) into [master](https://codecov.io/gh/apache/arrow/commit/4b7cdcb9220b6d94b251aef32c21ef9b4097ecfa?el=desc) (4b7cdcb) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.18%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.61%   82.61%   -0.01%     
   ==========================================
     Files         203      203              
     Lines       50140    50152      +12     
   ==========================================
   + Hits        41422    41431       +9     
   - Misses       8718     8721       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `89.04% <92.06%> (-0.49%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [4b7cdcb...797ffb6](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753574211


   @andygrove @jorgecarleitao 
   
   Is now ready for review. 
   I added some comments myself too.
   
   The PR now includes the following:
   
   * The main change to collect into 1 batch, and use `take` directly
   * Build info about column indexes upfront
   * Speed improvement in `create_key` (doing `row.value(i)` twice most importantly), this also makes q1 and q5 run a bit faster
   * Misc refactoring
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551048129



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       The Arrow spec supports both 32-bit and 64-bit variants for the offset vector, so in the 32-bit case, each batch would be limited to 2^32 bytes of variable-width data per array.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (1511f0e) into [master](https://codecov.io/gh/apache/arrow/commit/cc0ee5efcf9f6a67bcc407a11e8553a0409275c1?el=desc) (cc0ee5e) will **increase** coverage by `0.00%`.
   > The diff coverage is `92.06%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #9070   +/-   ##
   =======================================
     Coverage   82.55%   82.55%           
   =======================================
     Files         203      203           
     Lines       50043    50055   +12     
   =======================================
   + Hits        41313    41324   +11     
   - Misses       8730     8731    +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `89.42% <91.93%> (-0.52%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/arrow/schema.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9zY2hlbWEucnM=) | `90.93% <0.00%> (+0.31%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [cc0ee5e...1511f0e](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551040853



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;

Review comment:
       It should probably go into the arrow crate since it isn't specific to DataFusion. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550959005



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;

Review comment:
       Is there a place to put this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753574211


   @andygrove @jorgecarleitao 
   
   Is now ready for review. 
   I added some comments myself too.
   
   The PR now includes the following:
   
   * The main change to collect into 1 batch, and use `take` directly
   * Build info about column indices upfront
   * Speed improvement in `create_key` (doing `row.value(i)` twice most importantly), this also makes q1 and q5 run a bit faster
   * Misc refactoring
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (69d3286) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50180      +11     
   ==========================================
   + Hits        41436    41445       +9     
   - Misses       8733     8735       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.19% <85.52%> (-0.13%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...c96ba9f](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r550959459



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -479,37 +490,40 @@ fn build_batch(
 fn build_join_indexes(
     left: &JoinHashMap,
     right: &RecordBatch,
-    join_type: &JoinType,
+    join_type: JoinType,
     right_on: &HashSet<String>,
-) -> Result<Vec<(JoinIndex, RightIndex)>> {
+) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = right_on
         .iter()
         .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
         .collect::<Result<Vec<_>>>()?;
 
     let mut key = Vec::with_capacity(keys_values.len());
 
+    let mut left_indices = UInt64Builder::new(0);
+    let mut right_indices = UInt32Builder::new(0);
+
     match join_type {
         JoinType::Inner => {
-            let mut indexes = Vec::new(); // unknown a prior size
-
             // Visit all of the right rows
             for row in 0..right.num_rows() {
                 // Get the key and find it in the build index
                 create_key(&keys_values, row, &mut key)?;
                 let left_indexes = left.get(&key);
-
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
-                    // on an inner join, left and right indices are present
-                    indexes.push((Some(*x), Some(row as u32)));
-                })
+
+                if let Some(indices) = left_indexes {
+                    left_indices.append_slice(&indices)?;
+
+                    for _ in 0..indices.len() {
+                        // on an inner join, left and right indices are present
+                        right_indices.append_value(row as u32)?;

Review comment:
       this could use something like fill

##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -479,37 +490,40 @@ fn build_batch(
 fn build_join_indexes(
     left: &JoinHashMap,
     right: &RecordBatch,
-    join_type: &JoinType,
+    join_type: JoinType,
     right_on: &HashSet<String>,
-) -> Result<Vec<(JoinIndex, RightIndex)>> {
+) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = right_on
         .iter()
         .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows())))
         .collect::<Result<Vec<_>>>()?;
 
     let mut key = Vec::with_capacity(keys_values.len());
 
+    let mut left_indices = UInt64Builder::new(0);
+    let mut right_indices = UInt32Builder::new(0);
+
     match join_type {
         JoinType::Inner => {
-            let mut indexes = Vec::new(); // unknown a prior size
-
             // Visit all of the right rows
             for row in 0..right.num_rows() {
                 // Get the key and find it in the build index
                 create_key(&keys_values, row, &mut key)?;
                 let left_indexes = left.get(&key);
-
                 // for every item on the left and right with this key, add the respective pair
-                left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
-                    // on an inner join, left and right indices are present
-                    indexes.push((Some(*x), Some(row as u32)));
-                })
+
+                if let Some(indices) = left_indexes {
+                    left_indices.append_slice(&indices)?;
+
+                    for _ in 0..indices.len() {
+                        // on an inner join, left and right indices are present
+                        right_indices.append_value(row as u32)?;

Review comment:
       this could use something like append_n




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (33b96f3) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **increase** coverage by `0.01%`.
   > The diff coverage is `85.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   + Coverage   82.59%   82.60%   +0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50197      +28     
   ==========================================
   + Hits        41436    41465      +29     
   + Misses       8733     8732       -1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.08% <85.52%> (-0.24%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/arrow/src/array/array\_string.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvYXJyYXlfc3RyaW5nLnJz) | `89.94% <0.00%> (-0.22%)` | :arrow_down: |
   | [rust/arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvYXJyYXkvdHJhbnNmb3JtL21vZC5ycw==) | `89.01% <0.00%> (+0.72%)` | :arrow_up: |
   | [rust/arrow/src/ffi.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9hcnJvdy9zcmMvZmZpLnJz) | `72.22% <0.00%> (+2.22%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...33b96f3](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551049086



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       What I meant is that the types of the index used in the hash join has no effect of the type being used in the string array offsets. So the offsets values inside the string array might be using 64-bit values, while here we use 32-bit to index into the string elements, which would allow more than 4GB? Anyway, at some point there is a limit we should keep in mind 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (2db270c) into [master](https://codecov.io/gh/apache/arrow/commit/cc0ee5efcf9f6a67bcc407a11e8553a0409275c1?el=desc) (cc0ee5e) will **decrease** coverage by `0.00%`.
   > The diff coverage is `86.11%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.55%   82.55%   -0.01%     
   ==========================================
     Files         203      203              
     Lines       50043    50056      +13     
   ==========================================
   + Hits        41313    41323      +10     
   - Misses       8730     8733       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `89.45% <85.71%> (-0.49%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [cc0ee5e...2db270c](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (bbec070) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50179      +10     
   ==========================================
   + Hits        41436    41443       +7     
   - Misses       8733     8736       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.16% <85.33%> (-0.17%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...6d182c9](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r551052647



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       That would be a less trivial to get rid of than I thought, but at some point you could make a case either to convert those string arrays to use 64 bit indexes or to split the batches to have 2^32 size, or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#discussion_r553344085



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -48,23 +54,15 @@ use super::{
 use crate::error::{DataFusionError, Result};
 
 use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
 use ahash::RandomState;
 use log::debug;
 
-// An index of (batch, row) uniquely identifying a row in a part.
-type Index = (usize, usize);
-// A pair (left index, right index)
-// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different
-// as a left join may issue None indices, in which case
-type JoinIndex = Option<(usize, usize)>;
-// An index of row uniquely identifying a row in a batch
-type RightIndex = Option<u32>;
-
 // Maps ["on" value] -> [list of indices with this key's value]
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
-type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;

Review comment:
       FWIW I would love to see a join approach for larger datasets that doesn't require giant buffers (eg. 64 bit indexes) but a runtime switch over to using sort/merge join or its equivalent. But that is a pipe dream for me at the moment. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (d58abf6) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50180      +11     
   ==========================================
   + Hits        41436    41444       +8     
   - Misses       8733     8736       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.19% <85.52%> (-0.13%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...69d3286](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #9070: ARROW-11030: [Rust][DataFusion] [WIP] Concatenate left side batches to single batch in HashJoinExec

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #9070:
URL: https://github.com/apache/arrow/pull/9070#issuecomment-753313211


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=h1) Report
   > Merging [#9070](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=desc) (0efb5a7) into [master](https://codecov.io/gh/apache/arrow/commit/5228ede9abf8ecf9b4bb68a06075cd16af3523e9?el=desc) (5228ede) will **decrease** coverage by `0.00%`.
   > The diff coverage is `85.13%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9070/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9070      +/-   ##
   ==========================================
   - Coverage   82.59%   82.59%   -0.01%     
   ==========================================
     Files         204      204              
     Lines       50169    50179      +10     
   ==========================================
   + Hits        41436    41443       +7     
   - Misses       8733     8736       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `86.16% <84.93%> (-0.17%)` | :arrow_down: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `88.23% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/9070/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=footer). Last update [5228ede...6d182c9](https://codecov.io/gh/apache/arrow/pull/9070?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org