You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/14 08:52:16 UTC

[GitHub] [arrow-ballista] Ted-Jiang opened a new pull request, #347: Using local shuffle reader avoid flight rpc call.

Ted-Jiang opened a new pull request, #347:
URL: https://github.com/apache/arrow-ballista/pull/347

   # Which issue does this PR close?
   
   Closes #346.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] mingmwang commented on pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#issuecomment-1281724747

   @Ted-Jiang Could you please add some log info to indicate how many shuffle partitions are read from local and how many are ready from remote?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on a diff in pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#discussion_r996337833


##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -232,35 +267,53 @@ impl Stream for AbortableReceiverStream {
     }
 }
 
-fn send_fetch_partitions<R: PartitionReader + 'static>(
+fn send_fetch_partitions(
     partition_locations: Vec<PartitionLocation>,
     max_request_num: usize,
-    partition_reader: R,
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
     let mut join_handles = vec![];
     for p in partition_locations.into_iter() {
-        let semaphore = semaphore.clone();
-        let response_sender = response_sender.clone();
-        let partition_reader_clone = partition_reader.clone();
-        let join_handle = tokio::spawn(async move {
-            // Block if exceeds max request number
-            let permit = semaphore.acquire_owned().await.unwrap();
-            let r = partition_reader_clone.fetch_partition(&p).await;
-            // Block if the channel buffer is full
-            if let Err(e) = response_sender.send(r).await {
-                error!("Fail to send response event to the channel due to {}", e);
-            }
-            // Increase semaphore by dropping existing permits.
-            drop(permit);
-        });
-        join_handles.push(join_handle);
+        if check_is_local_location(&p.executor_meta.host) {
+            // local shuffle reader should not be restrict
+            debug!("Get local partition file from {}", &p.executor_meta.host);
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+            });
+            join_handles.push(join_handle);
+        } else {
+            debug!("Get remote partition file from {}", &p.executor_meta.host);
+            let semaphore = semaphore.clone();
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                // Block if exceeds max request number
+                let permit = semaphore.acquire_owned().await.unwrap();
+                let r = PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+                // Block if the channel buffer is ful
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+                // Increase semaphore by dropping existing permits.
+                drop(permit);
+            });
+            join_handles.push(join_handle);
+        }
     }
 
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
+fn check_is_local_location(host: &str) -> bool {

Review Comment:
   Filed https://github.com/apache/arrow-ballista/issues/356 so we can discuss this as a follow on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on a diff in pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#discussion_r996405264


##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -232,35 +267,53 @@ impl Stream for AbortableReceiverStream {
     }
 }
 
-fn send_fetch_partitions<R: PartitionReader + 'static>(
+fn send_fetch_partitions(
     partition_locations: Vec<PartitionLocation>,
     max_request_num: usize,
-    partition_reader: R,
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
     let mut join_handles = vec![];
     for p in partition_locations.into_iter() {
-        let semaphore = semaphore.clone();
-        let response_sender = response_sender.clone();
-        let partition_reader_clone = partition_reader.clone();
-        let join_handle = tokio::spawn(async move {
-            // Block if exceeds max request number
-            let permit = semaphore.acquire_owned().await.unwrap();
-            let r = partition_reader_clone.fetch_partition(&p).await;
-            // Block if the channel buffer is full
-            if let Err(e) = response_sender.send(r).await {
-                error!("Fail to send response event to the channel due to {}", e);
-            }
-            // Increase semaphore by dropping existing permits.
-            drop(permit);
-        });
-        join_handles.push(join_handle);
+        if check_is_local_location(&p.executor_meta.host) {
+            // local shuffle reader should not be restrict
+            debug!("Get local partition file from {}", &p.executor_meta.host);
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+            });
+            join_handles.push(join_handle);
+        } else {
+            debug!("Get remote partition file from {}", &p.executor_meta.host);
+            let semaphore = semaphore.clone();
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                // Block if exceeds max request number
+                let permit = semaphore.acquire_owned().await.unwrap();
+                let r = PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+                // Block if the channel buffer is ful
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+                // Increase semaphore by dropping existing permits.
+                drop(permit);
+            });
+            join_handles.push(join_handle);
+        }
     }
 
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
+fn check_is_local_location(host: &str) -> bool {

Review Comment:
   Sounds like a better approach🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on a diff in pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#discussion_r995794572


##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -232,35 +267,53 @@ impl Stream for AbortableReceiverStream {
     }
 }
 
-fn send_fetch_partitions<R: PartitionReader + 'static>(
+fn send_fetch_partitions(
     partition_locations: Vec<PartitionLocation>,
     max_request_num: usize,
-    partition_reader: R,
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
     let mut join_handles = vec![];
     for p in partition_locations.into_iter() {
-        let semaphore = semaphore.clone();
-        let response_sender = response_sender.clone();
-        let partition_reader_clone = partition_reader.clone();
-        let join_handle = tokio::spawn(async move {
-            // Block if exceeds max request number
-            let permit = semaphore.acquire_owned().await.unwrap();
-            let r = partition_reader_clone.fetch_partition(&p).await;
-            // Block if the channel buffer is full
-            if let Err(e) = response_sender.send(r).await {
-                error!("Fail to send response event to the channel due to {}", e);
-            }
-            // Increase semaphore by dropping existing permits.
-            drop(permit);
-        });
-        join_handles.push(join_handle);
+        if check_is_local_location(&p.executor_meta.host) {
+            // local shuffle reader should not be restrict
+            debug!("Get local partition file from {}", &p.executor_meta.host);
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+            });
+            join_handles.push(join_handle);
+        } else {
+            debug!("Get remote partition file from {}", &p.executor_meta.host);
+            let semaphore = semaphore.clone();
+            let response_sender = response_sender.clone();
+            let join_handle = tokio::spawn(async move {
+                // Block if exceeds max request number
+                let permit = semaphore.acquire_owned().await.unwrap();
+                let r = PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+                // Block if the channel buffer is ful
+                if let Err(e) = response_sender.send(r).await {
+                    error!("Fail to send response event to the channel due to {}", e);
+                }
+                // Increase semaphore by dropping existing permits.
+                drop(permit);
+            });
+            join_handles.push(join_handle);
+        }
     }
 
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
+fn check_is_local_location(host: &str) -> bool {

Review Comment:
   An alternate approach here would be to just look on disk and see if the shuffle files exist, then this can work in containerized environments, such as k8s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on a diff in pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#discussion_r995523029


##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -498,36 +639,37 @@ mod tests {
         Schema::new(vec![Field::new("id", DataType::Int32, false)])
     }
 
-    #[derive(Clone)]
-    struct MockPartitionReader {}

Review Comment:
   this will replace by  `PartitionReaderEnum::local`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#issuecomment-1279915752

   > This is awesome! Thanks @Ted-Jiang. I see a nice 30% speedup when running queries locally. 🚀
   
   @andygrove Cool! We can care the locality of shuffle in the future!👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#issuecomment-1278997885

   @mingmwang @andygrove @yahoNanJing PTAL😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] mingmwang commented on pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#issuecomment-1281731521

   @Ted-Jiang 
   
   Please pay attention to the error handling, both remote shuffle read and local shuffle read should return the same error when encounter issues, so that the Scheduler can reschedule tasks/stages.
   
   `BallistaError::FetchFailed`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove merged pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on pull request #347: Using local shuffle reader avoid flight rpc call.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#issuecomment-1281740247

   > @Ted-Jiang
   > 
   > Please pay attention to the error handling, both remote shuffle read and local shuffle read should return the same error when encounter issues, so that the Scheduler can reschedule tasks/stages.
   > 
   > `BallistaError::FetchFailed`
   
   Sure!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org