You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:13:39 UTC

[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #541: WIP: ShuffleReaderExec now supports multiple locations per partition

andygrove commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650138952



##########
File path: ballista/rust/client/src/context.rs
##########
@@ -74,32 +71,6 @@ impl BallistaContextState {
     }
 }
 
-struct WrappedStream {
-    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
-    schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for WrappedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-

Review comment:
       This is moved to ballista-core utils

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),
+            Arc::new(self.schema.as_ref().clone()),
+        );
+        Ok(Box::pin(result))
     }

Review comment:
       This is the main change

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       Once the basic shuffle mechanism is implemented, there will be a lot of optimization work to follow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org