You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/06/22 07:38:12 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #6742: Order Preserving RepartitionExec Implementation

mustafasrepo opened a new pull request, #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #6486.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   `RepartitionExec` , when handling multiple input partitions, creates `N` channels, where `N` is the output partition count. For each input partition `tx` of these channels is cloned. This results in a total of `input_partition * output_partition` effective `tx`s (total of `output_partition` `rx`s (receivers are not cloned)). 
   
   During processing, the channels are pulled for each output partition (single receiver from multiple inputs (if input partition is greater than 1)). Depending on the processing time, this paradigm disrupts existing ordering, since multiple input partitions are interleaved into single receiver (for each output partitioning). 
   
   This is particularly problematic when the input partition count is greater than 1, as it leads to an unpredictable order of records within the output partitions. Although this behavior is desirable for most use cases, for some use cases, we may want to preserve existing ordering during repartitioning.
   
   # What changes are included in this PR?
   This PR adds `SortPreservingRepartitionExec` which can preserve ordering when its input is ordered. In this PR we create total of `n_input*n_output` channels (sender and receivers). With this approach each channel is dedicated, to send data from an input partition to an output partition. 
   
   When executor receives data for each output partition (It has `n_input` receivers, that corresponds to data for each input partition [Data is not interleaved, because we no longer have single receiver], each output partition can merge (while preserving order) data from its receiver, hence each output partition can preserve its existing ordering.)
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   Yes fuzzy tests are added, to test ordering is preserved through following physical plan (input and output of the following physical plan should be same)
   ```sql
   "SortPreservingMergeExec: [a@0 ASC,b@1 ASC,c@2 ASC]",
   "  SortPreservingRepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=2", (Partitioning can be roundrobin also)
   "    SortPreservingRepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=1", (Partitioning can be roundrobin also)
   "      MemoryExec: partitions=1, partition_sizes=[75] (with existing ordering)",
   ```
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1239952787


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   I can't help noticing that a great deal of this file is almost the same as https://github.com/apache/arrow-datafusion/blob/80d47cce606ab7f22a28d1954aa38a0bda391eda/datafusion/core/src/physical_plan/repartition/mod.rs#L291-L484
   
   This code also has several non trivial parts to it (like memory tracking, yielding back to tokio, tasks. etc) much of which is covered by tests in https://github.com/apache/arrow-datafusion/blob/80d47cce606ab7f22a28d1954aa38a0bda391eda/datafusion/core/src/physical_plan/repartition/mod.rs#L685-L1216
   
   However, since the order aware repartition doesnt' use the same code (it uses a copy) many of those cases are not covered by the existing tests. 
   
   I wonder if there is a way to avoid the copy/paste code and only parameterizing what is different -- specifically I think the construction of the `DistributionSender` / `DistributionReceiver` Vecs and them how to combine the results if each receiver only has a single input.
   
   Is this something you tried? 
   



##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {
+    /// Channels for sending batches from input partitions to output partitions.
+    /// Key is the partition number.
+    channels: HashMap<
+        usize,
+        (
+            InputPartitionsToCurrentPartitionSender,
+            InputPartitionsToCurrentPartitionReceiver,
+            SharedMemoryReservation,
+        ),
+    >,
+
+    /// Helper that ensures that that background job is killed once it is no longer needed.
+    abort_helper: Arc<AbortOnDropMany<()>>,
+}
+
+/// The repartition operator maps N input partitions to M output partitions based on a
+/// partitioning scheme. Any input ordering is preserved.
+#[derive(Debug)]
+pub struct SortPreservingRepartitionExec {
+    /// Input execution plan
+    input: Arc<dyn ExecutionPlan>,
+
+    /// Partitioning scheme to use
+    partitioning: Partitioning,
+
+    /// Inner state that is initialized when the first output stream is created.
+    state: Arc<Mutex<SortPreservingRepartitionExecState>>,
+
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl SortPreservingRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.partitioning
+    }
+}
+
+impl ExecutionPlan for SortPreservingRepartitionExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SortPreservingRepartitionExec::try_new(
+            children[0].clone(),
+            self.partitioning.clone(),
+        )?))
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.partitioning.clone()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        // Indicate that input ordering is preserved:
+        self.input().output_ordering()
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start SortPreservingRepartitionExec::execute for partition: {}",
+            partition
+        );
+        // lock mutexes
+        let mut state = self.state.lock();
+
+        let num_input_partitions = self.input.output_partitioning().partition_count();
+        let num_output_partitions = self.partitioning.partition_count();
+
+        // if this is the first partition to be invoked then we need to set up initial state
+        if state.channels.is_empty() {
+            // create one channel per *output* partition
+            // note we use a custom channel that ensures there is always data for each receiver
+            // but limits the amount of buffering if required.
+            let (txs, rxs) =
+                partition_aware_channels(num_input_partitions, num_output_partitions);
+            // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition
+            let txs = transpose(txs);
+            let rxs = transpose(rxs);
+            for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
+                let reservation = Arc::new(Mutex::new(
+                    MemoryConsumer::new(format!(
+                        "SortPreservingRepartitionExec[{partition}]"
+                    ))
+                    .register(context.memory_pool()),
+                ));
+                state.channels.insert(partition, (tx, rx, reservation));
+            }
+
+            // launch one async task per *input* partition
+            let mut join_handles = Vec::with_capacity(num_input_partitions);
+            for i in 0..num_input_partitions {
+                let txs: HashMap<_, _> = state
+                    .channels
+                    .iter()
+                    .map(|(partition, (tx, _rx, reservation))| {
+                        (*partition, (tx[i].clone(), Arc::clone(reservation)))
+                    })
+                    .collect();
+                let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);
+
+                let input_task: JoinHandle<Result<()>> = tokio::spawn(pull_from_input(
+                    self.input.clone(),
+                    i,
+                    txs.clone(),
+                    self.partitioning.clone(),
+                    r_metrics,
+                    context.clone(),
+                ));
+
+                // In a separate task, wait for each input to be done
+                // (and pass along any errors, including panic!s)
+                let join_handle = tokio::spawn(wait_for_task(
+                    AbortOnDropSingle::new(input_task),
+                    txs.into_iter()
+                        .map(|(partition, (tx, _reservation))| (partition, tx))
+                        .collect(),
+                ));
+                join_handles.push(join_handle);
+            }
+
+            state.abort_helper = Arc::new(AbortOnDropMany(join_handles))
+        }
+
+        trace!(
+            "Before returning stream in SortPreservingRepartitionExec::execute for partition: {}",
+            partition
+        );
+
+        // now return stream for the specified *output* partition which will
+        // read from the channels
+        let (_tx, rx, reservation) = state
+            .channels
+            .remove(&partition)
+            .expect("partition not used yet");
+
+        // Store streams from all the input partitions:
+        let input_streams = rx
+            .into_iter()
+            .map(|receiver| {
+                Box::pin(PerPartitionStream {
+                    schema: self.schema(),
+                    receiver,
+                    drop_helper: Arc::clone(&state.abort_helper),
+                    reservation: reservation.clone(),
+                }) as SendableRecordBatchStream
+            })
+            .collect::<Vec<_>>();
+        // Note that receiver size (`rx.len()`) and `num_input_partitions` are same.
+
+        // Get existing ordering:
+        let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
+        // Merge streams (while preserving ordering) coming from input partitions to this partition:
+        streaming_merge(

Review Comment:
   ❤️  for the streaming merge  (cc @tustvold  that was a great API)



##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {
+    /// Channels for sending batches from input partitions to output partitions.
+    /// Key is the partition number.
+    channels: HashMap<
+        usize,
+        (
+            InputPartitionsToCurrentPartitionSender,
+            InputPartitionsToCurrentPartitionReceiver,
+            SharedMemoryReservation,
+        ),
+    >,
+
+    /// Helper that ensures that that background job is killed once it is no longer needed.
+    abort_helper: Arc<AbortOnDropMany<()>>,
+}
+
+/// The repartition operator maps N input partitions to M output partitions based on a
+/// partitioning scheme. Any input ordering is preserved.
+#[derive(Debug)]
+pub struct SortPreservingRepartitionExec {
+    /// Input execution plan
+    input: Arc<dyn ExecutionPlan>,
+
+    /// Partitioning scheme to use
+    partitioning: Partitioning,
+
+    /// Inner state that is initialized when the first output stream is created.
+    state: Arc<Mutex<SortPreservingRepartitionExecState>>,
+
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl SortPreservingRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.partitioning
+    }
+}
+
+impl ExecutionPlan for SortPreservingRepartitionExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SortPreservingRepartitionExec::try_new(
+            children[0].clone(),
+            self.partitioning.clone(),
+        )?))
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.partitioning.clone()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        // Indicate that input ordering is preserved:
+        self.input().output_ordering()
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start SortPreservingRepartitionExec::execute for partition: {}",
+            partition
+        );
+        // lock mutexes
+        let mut state = self.state.lock();
+
+        let num_input_partitions = self.input.output_partitioning().partition_count();
+        let num_output_partitions = self.partitioning.partition_count();
+
+        // if this is the first partition to be invoked then we need to set up initial state
+        if state.channels.is_empty() {
+            // create one channel per *output* partition
+            // note we use a custom channel that ensures there is always data for each receiver
+            // but limits the amount of buffering if required.
+            let (txs, rxs) =
+                partition_aware_channels(num_input_partitions, num_output_partitions);
+            // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition
+            let txs = transpose(txs);
+            let rxs = transpose(rxs);
+            for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
+                let reservation = Arc::new(Mutex::new(
+                    MemoryConsumer::new(format!(
+                        "SortPreservingRepartitionExec[{partition}]"
+                    ))
+                    .register(context.memory_pool()),
+                ));
+                state.channels.insert(partition, (tx, rx, reservation));
+            }
+
+            // launch one async task per *input* partition
+            let mut join_handles = Vec::with_capacity(num_input_partitions);
+            for i in 0..num_input_partitions {
+                let txs: HashMap<_, _> = state
+                    .channels
+                    .iter()
+                    .map(|(partition, (tx, _rx, reservation))| {
+                        (*partition, (tx[i].clone(), Arc::clone(reservation)))
+                    })
+                    .collect();
+                let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);
+
+                let input_task: JoinHandle<Result<()>> = tokio::spawn(pull_from_input(
+                    self.input.clone(),
+                    i,
+                    txs.clone(),
+                    self.partitioning.clone(),
+                    r_metrics,
+                    context.clone(),
+                ));
+
+                // In a separate task, wait for each input to be done
+                // (and pass along any errors, including panic!s)
+                let join_handle = tokio::spawn(wait_for_task(

Review Comment:
   FWIW there is a better `JoinSet` abstraction for this now in Tokio -- I think @aprimadi  is trying to rework the code to use it (e.g. https://github.com/apache/arrow-datafusion/pull/6750)
   
   If we were able to reduce the duplication in this code with the non order preserving repartition I think this kind of cleanup would be easier. 



##########
datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod sp_repartition_fuzz_tests {
+    use arrow::compute::concat_batches;
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    use arrow_schema::SortOptions;
+    use datafusion::physical_plan::memory::MemoryExec;
+    use datafusion::physical_plan::repartition::sort_preserving_repartition::SortPreservingRepartitionExec;
+    use datafusion::physical_plan::repartition::RepartitionExec;
+    use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+    use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
+    use datafusion::prelude::SessionContext;
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+    use rand::rngs::StdRng;
+    use rand::{Rng, SeedableRng};
+    use std::sync::Arc;
+    use test_utils::add_empty_batches;
+
+    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+    async fn sort_preserving_repartition_test() {
+        let seed_start = 0;
+        let seed_end = 100;
+        let n_row = 1000;
+        // Since ordering in the test (ORDER BY a,b,c)
+        // covers all the table (table consists of a,b,c columns).
+        // Result doesn't depend on the stable/unstable sort
+        // behaviour. We can choose, n_distinct as we like. However,
+        // we chose it a large number to decrease probability of having same rows in the table.
+        let n_distinct = 1_000_000;
+        for (is_first_roundrobin, is_first_sort_preserving) in
+            [(false, false), (false, true), (true, false), (true, true)]
+        {
+            for is_second_roundrobin in [false, true] {
+                let mut handles = Vec::new();
+
+                for seed in seed_start..seed_end {
+                    let job = tokio::spawn(run_sort_preserving_repartition_test(
+                        make_staggered_batches::<true>(n_row, n_distinct, seed as u64),
+                        is_first_roundrobin,
+                        is_first_sort_preserving,
+                        is_second_roundrobin,
+                    ));
+                    handles.push(job);
+                }
+
+                for job in handles {
+                    job.await.unwrap();
+                }
+            }
+        }
+    }
+
+    /// Check whether physical plan below
+    ///     "SortPreservingMergeExec: [a@0 ASC,b@1 ASC,c@2 ASC]",
+    ///     "  SortPreservingRepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=2", (Partitioning can be roundrobin also)
+    ///     "    SortPreservingRepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=1", (Partitioning can be roundrobin also)
+    ///     "      MemoryExec: partitions=1, partition_sizes=[75]",
+    /// and / or
+    ///     "SortPreservingMergeExec: [a@0 ASC,b@1 ASC,c@2 ASC]",
+    ///     "  SortPreservingRepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=2", (Partitioning can be roundrobin also)
+    ///     "    RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 2), input_partitions=1", (Partitioning can be roundrobin also)
+    ///     "      MemoryExec: partitions=1, partition_sizes=[75]",
+    /// preserves ordering. Input fed to the plan above should be same with the output of the plan.
+    async fn run_sort_preserving_repartition_test(

Review Comment:
   I think it would also help to document what `is_first_roundrobin` and `is_first_sort_preserving` means here 



##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {
+    /// Channels for sending batches from input partitions to output partitions.
+    /// Key is the partition number.
+    channels: HashMap<
+        usize,
+        (
+            InputPartitionsToCurrentPartitionSender,
+            InputPartitionsToCurrentPartitionReceiver,
+            SharedMemoryReservation,
+        ),
+    >,
+
+    /// Helper that ensures that that background job is killed once it is no longer needed.
+    abort_helper: Arc<AbortOnDropMany<()>>,
+}
+
+/// The repartition operator maps N input partitions to M output partitions based on a
+/// partitioning scheme. Any input ordering is preserved.
+#[derive(Debug)]
+pub struct SortPreservingRepartitionExec {
+    /// Input execution plan
+    input: Arc<dyn ExecutionPlan>,
+
+    /// Partitioning scheme to use
+    partitioning: Partitioning,
+
+    /// Inner state that is initialized when the first output stream is created.
+    state: Arc<Mutex<SortPreservingRepartitionExecState>>,
+
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl SortPreservingRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.partitioning
+    }
+}
+
+impl ExecutionPlan for SortPreservingRepartitionExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(SortPreservingRepartitionExec::try_new(
+            children[0].clone(),
+            self.partitioning.clone(),
+        )?))
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.partitioning.clone()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        // Indicate that input ordering is preserved:
+        self.input().output_ordering()
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input.equivalence_properties()
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start SortPreservingRepartitionExec::execute for partition: {}",
+            partition
+        );
+        // lock mutexes
+        let mut state = self.state.lock();
+
+        let num_input_partitions = self.input.output_partitioning().partition_count();
+        let num_output_partitions = self.partitioning.partition_count();
+
+        // if this is the first partition to be invoked then we need to set up initial state
+        if state.channels.is_empty() {
+            // create one channel per *output* partition
+            // note we use a custom channel that ensures there is always data for each receiver
+            // but limits the amount of buffering if required.
+            let (txs, rxs) =
+                partition_aware_channels(num_input_partitions, num_output_partitions);
+            // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition
+            let txs = transpose(txs);
+            let rxs = transpose(rxs);
+            for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
+                let reservation = Arc::new(Mutex::new(
+                    MemoryConsumer::new(format!(
+                        "SortPreservingRepartitionExec[{partition}]"
+                    ))
+                    .register(context.memory_pool()),
+                ));
+                state.channels.insert(partition, (tx, rx, reservation));
+            }
+
+            // launch one async task per *input* partition
+            let mut join_handles = Vec::with_capacity(num_input_partitions);
+            for i in 0..num_input_partitions {
+                let txs: HashMap<_, _> = state
+                    .channels
+                    .iter()
+                    .map(|(partition, (tx, _rx, reservation))| {
+                        (*partition, (tx[i].clone(), Arc::clone(reservation)))
+                    })
+                    .collect();
+                let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);
+
+                let input_task: JoinHandle<Result<()>> = tokio::spawn(pull_from_input(
+                    self.input.clone(),
+                    i,
+                    txs.clone(),
+                    self.partitioning.clone(),
+                    r_metrics,
+                    context.clone(),
+                ));
+
+                // In a separate task, wait for each input to be done
+                // (and pass along any errors, including panic!s)
+                let join_handle = tokio::spawn(wait_for_task(
+                    AbortOnDropSingle::new(input_task),
+                    txs.into_iter()
+                        .map(|(partition, (tx, _reservation))| (partition, tx))
+                        .collect(),
+                ));
+                join_handles.push(join_handle);
+            }
+
+            state.abort_helper = Arc::new(AbortOnDropMany(join_handles))
+        }
+
+        trace!(
+            "Before returning stream in SortPreservingRepartitionExec::execute for partition: {}",
+            partition
+        );
+
+        // now return stream for the specified *output* partition which will
+        // read from the channels
+        let (_tx, rx, reservation) = state
+            .channels
+            .remove(&partition)
+            .expect("partition not used yet");
+
+        // Store streams from all the input partitions:
+        let input_streams = rx
+            .into_iter()
+            .map(|receiver| {
+                Box::pin(PerPartitionStream {
+                    schema: self.schema(),
+                    receiver,
+                    drop_helper: Arc::clone(&state.abort_helper),
+                    reservation: reservation.clone(),
+                }) as SendableRecordBatchStream
+            })
+            .collect::<Vec<_>>();
+        // Note that receiver size (`rx.len()`) and `num_input_partitions` are same.
+
+        // Get existing ordering:
+        let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
+        // Merge streams (while preserving ordering) coming from input partitions to this partition:
+        streaming_merge(
+            input_streams,
+            self.schema(),
+            sort_exprs,
+            BaselineMetrics::new(&self.metrics, partition),
+            context.session_config().batch_size(),
+        )
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "SortPreservingRepartitionExec: partitioning={:?}, input_partitions={}",
+                    self.partitioning,
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.input.statistics()
+    }
+}
+
+impl SortPreservingRepartitionExec {
+    /// Create a new `SortPreservingRepartitionExec`.
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        Ok(SortPreservingRepartitionExec {
+            input,
+            partitioning,
+            state: Arc::new(Mutex::new(SortPreservingRepartitionExecState {
+                channels: HashMap::new(),
+                abort_helper: Arc::new(AbortOnDropMany::<()>(vec![])),
+            })),
+            metrics: ExecutionPlanMetricsSet::new(),
+        })
+    }
+}
+
+/// This struct converts a receiver to a stream.
+/// Receiver receives data on an SPSC channel.
+struct PerPartitionStream {
+    /// Schema wrapped by Arc
+    schema: SchemaRef,
+
+    /// channel containing the repartitioned batches
+    receiver: DistributionReceiver<MaybeBatch>,
+
+    /// Handle to ensure background tasks are killed when no longer needed.
+    #[allow(dead_code)]
+    drop_helper: Arc<AbortOnDropMany<()>>,
+
+    /// Memory reservation.
+    reservation: SharedMemoryReservation,
+}
+
+impl Stream for PerPartitionStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        match self.receiver.recv().poll_unpin(cx) {

Review Comment:
   You could use the [`ready!` macro ](https://docs.rs/futures/0.3.28/futures/macro.ready.html)here if you wanted and save some repetition
   
   ```rust
           match ready!(self.receiver.recv().poll_unpin(cx)) {
               Some(Some(v)) => {
                   if let Ok(batch) = &v {
                       self.reservation
                           .lock()
                           .shrink(batch.get_array_memory_size());
                   }
                   Poll::Ready(Some(v))
               }
               Some(None) => {
                   // Input partition has finished sending batches
                   Poll::Ready(None)
               }
               None => Poll::Ready(None),
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1240688252


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   > Do you have any ideas on how we can refactor to reduce duplication?
   
   There were two main differences I saw:
   1. The order preserving variant has `Vec< DistributionSender>` and `Vec< DistributionReceiver>`  and the normal one has `DistributionSender>` and `DistributionReceiver` 
   2. If there were a `Vec< DistributionReceiver>` then they needed to be merged
   
   I wonder if it would be possible to have the repartition operator always have a `Vec< DistributionSender>` and `Vec< DistributionReceiver>` and then special case if there is only one receiver (e.g. don't merge) 🤔 



##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   > If there is no easy/obvious way, we can merge this and do another PR for refactoring/deduplication after some more in-depth thinking.
   
   I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1240933395


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   @crepererum, what do you think about @alamb's suggestion? Can we apply DRY here? Assuming we can, could that have any negative performance impact on the unordered case, or not?
   
   IIRC you worked on this operator before so maybe you'd have useful insights here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#issuecomment-1603122982

   We will soon follow up with a planner rule to use this repartitioner when order preservation is required 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1242666567


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -245,6 +253,9 @@ pub struct RepartitionExec {
 
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+
+    /// Boolean flag to decide whether to preserve ordering

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1240044007


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   AFAIK @mustafasrepo tried to avoid duplications in general, but didn't engage in non-trivial refactoring to avoid them to maximum extent. Do you have any ideas on how we can refactor to reduce duplication?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1240091328


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   If there is no easy/obvious way, we can merge this and do another PR for refactoring/deduplication after some more in-depth thinking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#issuecomment-1603089491

   I plan to review this carefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#discussion_r1241830460


##########
datafusion/core/src/physical_plan/repartition/sort_preserving_repartition.rs:
##########
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements an order-preserving repartitioning operator
+//! mapping N input partitions to M output partitions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::SendableRecordBatchStream;
+
+use crate::physical_plan::common::{
+    transpose, AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation,
+};
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::repartition::distributor_channels::{
+    partition_aware_channels, DistributionReceiver, DistributionSender,
+};
+use crate::physical_plan::repartition::{
+    pull_from_input, wait_for_task, MaybeBatch, RepartitionMetrics,
+};
+use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::{
+    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
+    RecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use futures::{FutureExt, Stream};
+use hashbrown::HashMap;
+use log::trace;
+use parking_lot::Mutex;
+use tokio::task::JoinHandle;
+
+type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
+type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
+
+/// Inner state of [`SortPreservingRepartitionExec`].
+#[derive(Debug)]
+struct SortPreservingRepartitionExecState {

Review Comment:
   I don't think this DRY would impact perf, at least not enough to have duplicated code. So let's simplify if we can :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6742: Order Preserving RepartitionExec Implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6742:
URL: https://github.com/apache/arrow-datafusion/pull/6742#issuecomment-1607805443

   I have changed the implementation so that repetition is minimum as much as possible. With these changes, code difference with main decreased a lot. 
   
   At first, I thought having a separate executor, would be desirable by the community. However, maintaining, very similar 2 implementations is a burden, and prone to error. I think current version is neat. Thanks for the suggestion @alamb @crepererum.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org