You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/25 14:28:43 UTC

[GitHub] [arrow-datafusion] liukun4515 commented on a diff in pull request #4377: Refactor the Hash Join

liukun4515 commented on code in PR #4377:
URL: https://github.com/apache/arrow-datafusion/pull/4377#discussion_r1032506015


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1482,105 +1325,115 @@ impl HashJoinStream {
 
         let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
             let num_rows = left_data.1.num_rows();
-            match self.join_type {
-                JoinType::Left
-                | JoinType::Full
-                | JoinType::LeftSemi
-                | JoinType::LeftAnti => {
-                    let mut buffer = BooleanBufferBuilder::new(num_rows);
-
-                    buffer.append_n(num_rows, false);
-
-                    buffer
-                }
-                JoinType::Inner
-                | JoinType::Right
-                | JoinType::RightSemi
-                | JoinType::RightAnti => BooleanBufferBuilder::new(0),
+            if need_produce_result_in_final(self.join_type) {
+                // these join type need the bitmap to identify which row has be matched or unmatched.
+                // For the `left semi` join, need to use the bitmap to produce the matched row in the left side
+                // For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null
+                // For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side
+                // For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null
+                let mut buffer = BooleanBufferBuilder::new(num_rows);
+                buffer.append_n(num_rows, false);
+                buffer
+            } else {
+                BooleanBufferBuilder::new(0)
             }
         });
 
         self.right
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
+                // one right batch in the join loop
                 Some(Ok(batch)) => {
+                    self.join_metrics.input_batches.add(1);
+                    self.join_metrics.input_rows.add(batch.num_rows());
                     let timer = self.join_metrics.join_time.timer();
-                    let result = build_batch(
+
+                    // get the matched two indices for the on condition
+                    let left_right_indices = build_join_indices(
                         &batch,
                         left_data,
                         &self.on_left,
                         &self.on_right,
                         &self.filter,
-                        self.join_type,
-                        &self.schema,
-                        &self.column_indices,
                         &self.random_state,
                         &self.null_equals_null,
                     );
-                    self.join_metrics.input_batches.add(1);
-                    self.join_metrics.input_rows.add(batch.num_rows());
-                    if let Ok((ref batch, ref left_side)) = result {
-                        timer.done();
-                        self.join_metrics.output_batches.add(1);
-                        self.join_metrics.output_rows.add(batch.num_rows());
-
-                        match self.join_type {
-                            JoinType::Left
-                            | JoinType::Full
-                            | JoinType::LeftSemi
-                            | JoinType::LeftAnti => {
+
+                    match left_right_indices {
+                        Ok((left_side, right_side)) => {
+                            timer.done();
+
+                            // set the left bitmap
+                            // and only left, full, left semi, left anti need the left bitmap
+                            if need_produce_result_in_final(self.join_type) {
                                 left_side.iter().flatten().for_each(|x| {
                                     visited_left_side.set_bit(x as usize, true);
                                 });
                             }
-                            JoinType::Inner
-                            | JoinType::Right
-                            | JoinType::RightSemi
-                            | JoinType::RightAnti => {}
+
+                            // adjust the two side indices base on the join type
+                            let (left_side, right_side) = adjust_indices_by_join_type(
+                                left_side,
+                                right_side,
+                                batch.num_rows(),
+                                self.join_type,
+                            );
+
+                            let result = build_batch_from_indices(
+                                &self.schema,
+                                &left_data.1,
+                                &batch,
+                                left_side,
+                                right_side,
+                                &self.column_indices,
+                            );
+                            self.join_metrics.output_batches.add(1);
+                            self.join_metrics.output_rows.add(batch.num_rows());
+                            Some(result)
+                        }
+                        Err(_) => {
+                            // TODO why the type of result stream is `Result<T, ArrowError>`, and not the `DataFusionError`
+                            Some(Err(ArrowError::ComputeError(
+                                "Build left right indices error".to_string(),
+                            )))
                         }
                     }
-                    Some(result.map(|x| x.0))
                 }
+                // the right side has been consumed
+                // TODO: Some(Err) case

Review Comment:
   resolved by https://github.com/apache/arrow-datafusion/pull/4373/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org