You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:13:49 UTC

[GitHub] [arrow-datafusion] edrevo commented on a change in pull request #541: ShuffleReaderExec now supports multiple locations per partition

edrevo commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650183206



##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))

Review comment:
       nit; if you change `fetch_partition` to take a refernce, you can avoid the `.clone`:
   
   ```suggestion
           let partition_locations = &self.partition[partition];
           let result = future::join_all(partition_locations.iter().map(fetch_partition))
   ```

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       > I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)?
   
   Yes, that's exactly what it does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org