You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/03 14:32:13 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2428: Fix Ballista executing during plan

tustvold opened a new pull request, #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428

   # Which issue does this PR close?
   
   Part of #2307 
   
   Closes #2426 
   
    # Rationale for this change
   
   `ExecutionPlan::execute` is expected to defer computation to the returned `SendableRecordBatchStream`, this is necessary for result streaming to work correctly. Currently the Ballista ExecutionPlan instead evaluate in `ExecutionPlan::execute`, which is not correct.
   
   # What changes are included in this PR?
   
   Defers plan evaluation to the `SendableRecordBatchStream` 
   
   # Are there any user-facing changes?
   
   Yes, I replaced `ballista::core::WrappedStream` with the `RecordBatchStreamAdapter` from DataFusion
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#issuecomment-1116186649

   I skimmed through this and it LGTM but I have not been active in Ballista lately so would be good if perhaps @mingmwang @yahoNanJing  or @yjshen could also take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#discussion_r863850045


##########
ballista/rust/core/src/execution_plans/distributed_query.rs:
##########
@@ -299,6 +233,79 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
     }
 }
 
+async fn execute_query(
+    scheduler_url: String,
+    session_id: String,
+    query: ExecuteQueryParams,
+) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
+    info!("Connecting to Ballista scheduler at {}", scheduler_url);
+    // TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
+
+    let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+
+    let query_result = scheduler
+        .execute_query(query)
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
+        .into_inner();
+
+    assert_eq!(
+        session_id, query_result.session_id,
+        "Session id inconsistent between Client and Server side in DistributedQueryExec."
+    );
+
+    let job_id = query_result.job_id;
+    let mut prev_status: Option<job_status::Status> = None;
+
+    loop {
+        let GetJobStatusResult { status } = scheduler
+            .get_job_status(GetJobStatusParams {
+                job_id: job_id.clone(),
+            })
+            .await
+            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
+            .into_inner();
+        let status = status.and_then(|s| s.status).ok_or_else(|| {
+            DataFusionError::Internal("Received empty status message".to_owned())
+        })?;
+        let wait_future = tokio::time::sleep(Duration::from_millis(100));
+        let has_status_change = prev_status.map(|x| x != status).unwrap_or(true);
+        match status {
+            job_status::Status::Queued(_) => {
+                if has_status_change {
+                    info!("Job {} still queued...", job_id);
+                }
+                wait_future.await;
+                prev_status = Some(status);
+            }
+            job_status::Status::Running(_) => {
+                if has_status_change {
+                    info!("Job {} is running...", job_id);
+                }
+                wait_future.await;
+                prev_status = Some(status);
+            }
+            job_status::Status::Failed(err) => {
+                let msg = format!("Job {} failed: {}", job_id, err.error);
+                error!("{}", msg);
+                break Err(DataFusionError::Execution(msg));
+            }
+            job_status::Status::Completed(completed) => {
+                let streams = completed.partition_location.into_iter().map(|p| {

Review Comment:
   This is a breaking change, previously it would buffer up all records from all partitions and then yield them in partition order, now it will stream them potentially interleaving results from different partitions. I'm fairly certain this is fine, but want to draw attention to this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#discussion_r863846420


##########
ballista/rust/core/src/execution_plans/distributed_query.rs:
##########
@@ -168,15 +169,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
     ) -> Result<SendableRecordBatchStream> {
         assert_eq!(0, partition);
 
-        info!("Connecting to Ballista scheduler at {}", self.scheduler_url);
-        // TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
-
-        let mut scheduler = SchedulerGrpcClient::connect(self.scheduler_url.clone())
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-
-        let schema: Schema = self.plan.schema().as_ref().clone().into();
-
         let mut buf: Vec<u8> = vec![];

Review Comment:
   I recommend reviewing this without whitespace
   
   https://github.com/apache/arrow-datafusion/pull/2428/files?w=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove merged pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#issuecomment-1116214025

   I've backed out the "fix" for #2426 as it was causing test failures, and isn't on the critical path to making ExecutionPlan sync. I lack sufficient knowledge of ballista to diagnose this, so will leave it for someone with a better grasp on what is going on


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#discussion_r863851407


##########
ballista/rust/core/src/utils.rs:
##########
@@ -316,38 +313,3 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
         }
     }
 }
-
-pub struct WrappedStream {

Review Comment:
   This has been replaced by RecordBatchStreamAdapter which also does not require boxing or pinning the stream, making it not only easier to use, but potentially slightly faster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2428: Fix Ballista executing during plan

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2428:
URL: https://github.com/apache/arrow-datafusion/pull/2428#issuecomment-1117334119

   @tustvold I am working on fixing the Ballista integration tests and then will test with this PR and approve assuming no issues come up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org