You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/25 07:51:19 UTC

[GitHub] [arrow-datafusion] Ted-Jiang opened a new pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Ted-Jiang opened a new pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883


   …diff tokio runtime in ballista_executor
   
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #1770.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   No
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1054945828


   retest this plz.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yordan-pavlov commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r815037476



##########
File path: ballista/rust/executor/src/cpu_bound_executor.rs
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads such as DataFusion plans
+
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads such as DataFusion plans
+
+use log::warn;
+use parking_lot::Mutex;
+use std::{pin::Pin, sync::Arc};
+use tokio::sync::oneshot::Receiver;
+
+use futures::Future;
+
+/// The type of thing that the dedicated executor runs
+type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio runtime
+#[derive(Clone)]
+pub struct DedicatedExecutor {

Review comment:
       I think it would be useful to add some comments to explain why a dedicated executor is necessary / useful; maybe even include a link to @alamb's blog post on the topic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r816400239



##########
File path: ballista/rust/executor/src/cpu_bound_executor.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads as query plans
+
+use log::warn;
+use parking_lot::Mutex;
+use std::{pin::Pin, sync::Arc};
+use tokio::sync::oneshot::Receiver;
+
+use futures::Future;
+
+/// The type of thing that the dedicated executor runs
+type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks
+/// from IO-bound tasks(heartbeats). Get more from the above blog.
+#[derive(Clone)]
+pub struct DedicatedExecutor {
+    state: Arc<Mutex<State>>,
+}
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio Executor
+struct State {
+    /// The number of threads in this pool
+    num_threads: usize,
+
+    /// The name of the threads for this executor
+    thread_name: String,
+
+    /// Channel for requests -- the dedicated executor takes requests
+    /// from here and runs them.
+    requests: Option<std::sync::mpsc::Sender<Task>>,
+
+    /// The thread that is doing the work
+    thread: Option<std::thread::JoinHandle<()>>,
+}
+
+/// The default worker priority (value passed to `libc::setpriority`);
+const WORKER_PRIORITY: i32 = 10;
+
+impl std::fmt::Debug for DedicatedExecutor {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let state = self.state.lock();
+
+        let mut d = f.debug_struct("DedicatedExecutor");
+
+        d.field("num_threads", &state.num_threads)
+            .field("thread_name", &state.thread_name);
+
+        if state.requests.is_some() {
+            d.field("requests", &"Some(...)")
+        } else {
+            d.field("requests", &"None")
+        };
+
+        if state.thread.is_some() {
+            d.field("thread", &"Some(...)")
+        } else {
+            d.field("thread", &"None")
+        };
+
+        d.finish()
+    }
+}
+
+impl DedicatedExecutor {
+    /// https://stackoverflow.com/questions/62536566
+    /// Creates a new `DedicatedExecutor` with a dedicated tokio
+    /// runtime that is separate from the `[tokio::main]` threadpool.
+    ///
+    /// The worker thread priority is set to low so that such tasks do
+    /// not starve other more important tasks (such as answering health checks)
+    ///
+    pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self {
+        let thread_name = thread_name.into();
+        let name_copy = thread_name.to_string();
+
+        let (tx, rx) = std::sync::mpsc::channel();
+
+        //Cannot create a seperated tokio runtime in another tokio runtime,
+        //So use std::thread to spawn a thread
+        let thread = std::thread::spawn(move || {
+            let runtime = tokio::runtime::Builder::new_multi_thread()
+                .enable_all()
+                .thread_name(&name_copy)
+                .worker_threads(num_threads)
+                .on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY))
+                .build()
+                .expect("Creating tokio runtime");
+
+            // By entering the context, all calls to `tokio::spawn` go
+            // to this executor
+            let _guard = runtime.enter();
+
+            while let Ok(request) = rx.recv() {
+                // TODO feedback request status
+                tokio::task::spawn(request);
+            }
+        });
+
+        let state = State {
+            num_threads,
+            thread_name,
+            requests: Some(tx),
+            thread: Some(thread),
+        };
+
+        Self {
+            state: Arc::new(Mutex::new(state)),
+        }
+    }
+
+    /// Runs the specified Future (and any tasks it spawns) on the
+    /// `DedicatedExecutor`.
+    ///
+    /// Currently all tasks are added to the tokio executor
+    /// immediately and compete for the threadpool's resources.
+    pub fn spawn<T>(&self, task: T) -> Receiver<T::Output>
+    where
+        T: Future + Send + 'static,
+        T::Output: Send + 'static,
+    {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+
+        // create a execution plan to spawn
+        let job = Box::pin(async move {
+            let task_output = task.await;
+            if tx.send(task_output).is_err() {
+                warn!("Spawned task output ignored: receiver dropped");
+            }
+        });
+
+        let mut state = self.state.lock();
+
+        if let Some(requests) = &mut state.requests {
+            // would fail if someone has started shutdown
+            requests.send(job).ok();
+        } else {
+            warn!("tried to schedule task on an executor that was shutdown");
+        }
+
+        rx
+    }
+
+    /// signals shutdown of this executor and any Clones
+    #[allow(dead_code)]

Review comment:
       For fix `clippy`, In the future if realized close the executor, will delete this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang removed a comment on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang removed a comment on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1054942649


   retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r815975802



##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         let executor_server = self.executor_server.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");
+            //TODO make it configurable

Review comment:
       ```suggestion
               // Use a dedicated executor for CPU bound tasks so that the main tokio
               // executor can still answer requests even when under load
               //TODO make it configurable
   ```

##########
File path: ballista/rust/executor/src/cpu_bound_executor.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads as query plans
+
+use log::warn;
+use parking_lot::Mutex;
+use std::{pin::Pin, sync::Arc};
+use tokio::sync::oneshot::Receiver;
+
+use futures::Future;
+
+/// The type of thing that the dedicated executor runs
+type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks
+/// from IO-bound tasks(heartbeats). Get more from the above blog.
+#[derive(Clone)]
+pub struct DedicatedExecutor {
+    state: Arc<Mutex<State>>,
+}
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio Executor
+struct State {
+    /// The number of threads in this pool
+    num_threads: usize,
+
+    /// The name of the threads for this executor
+    thread_name: String,
+
+    /// Channel for requests -- the dedicated executor takes requests
+    /// from here and runs them.
+    requests: Option<std::sync::mpsc::Sender<Task>>,
+
+    /// The thread that is doing the work
+    thread: Option<std::thread::JoinHandle<()>>,
+}
+
+/// The default worker priority (value passed to `libc::setpriority`);
+const WORKER_PRIORITY: i32 = 10;
+
+impl std::fmt::Debug for DedicatedExecutor {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let state = self.state.lock();
+
+        let mut d = f.debug_struct("DedicatedExecutor");
+
+        d.field("num_threads", &state.num_threads)
+            .field("thread_name", &state.thread_name);
+
+        if state.requests.is_some() {
+            d.field("requests", &"Some(...)")
+        } else {
+            d.field("requests", &"None")
+        };
+
+        if state.thread.is_some() {
+            d.field("thread", &"Some(...)")
+        } else {
+            d.field("thread", &"None")
+        };
+
+        d.finish()
+    }
+}
+
+impl DedicatedExecutor {
+    /// https://stackoverflow.com/questions/62536566
+    /// Creates a new `DedicatedExecutor` with a dedicated tokio
+    /// runtime that is separate from the `[tokio::main]` threadpool.
+    ///
+    /// The worker thread priority is set to low so that such tasks do
+    /// not starve other more important tasks (such as answering health checks)
+    ///
+    pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self {
+        let thread_name = thread_name.into();
+        let name_copy = thread_name.to_string();
+
+        let (tx, rx) = std::sync::mpsc::channel();
+
+        //Cannot create a seperated tokio runtime in another tokio runtime,
+        //So use std::thread to spawn a thread
+        let thread = std::thread::spawn(move || {
+            let runtime = tokio::runtime::Builder::new_multi_thread()
+                .enable_all()
+                .thread_name(&name_copy)
+                .worker_threads(num_threads)
+                .on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY))
+                .build()
+                .expect("Creating tokio runtime");
+
+            // By entering the context, all calls to `tokio::spawn` go
+            // to this executor
+            let _guard = runtime.enter();
+
+            while let Ok(request) = rx.recv() {
+                // TODO feedback request status
+                tokio::task::spawn(request);
+            }
+        });
+
+        let state = State {
+            num_threads,
+            thread_name,
+            requests: Some(tx),
+            thread: Some(thread),
+        };
+
+        Self {
+            state: Arc::new(Mutex::new(state)),
+        }
+    }
+
+    /// Runs the specified Future (and any tasks it spawns) on the
+    /// `DedicatedExecutor`.
+    ///
+    /// Currently all tasks are added to the tokio executor
+    /// immediately and compete for the threadpool's resources.
+    pub fn spawn<T>(&self, task: T) -> Receiver<T::Output>
+    where
+        T: Future + Send + 'static,
+        T::Output: Send + 'static,
+    {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+
+        // create a execution plan to spawn
+        let job = Box::pin(async move {
+            let task_output = task.await;
+            if tx.send(task_output).is_err() {
+                warn!("Spawned task output ignored: receiver dropped");
+            }
+        });
+
+        let mut state = self.state.lock();
+
+        if let Some(requests) = &mut state.requests {
+            // would fail if someone has started shutdown
+            requests.send(job).ok();
+        } else {
+            warn!("tried to schedule task on an executor that was shutdown");
+        }
+
+        rx
+    }
+
+    /// signals shutdown of this executor and any Clones
+    #[allow(dead_code)]

Review comment:
       I wonder if this `#allow` is really necessary?

##########
File path: ballista/rust/executor/src/cpu_bound_executor.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads as query plans
+
+use log::warn;
+use parking_lot::Mutex;
+use std::{pin::Pin, sync::Arc};
+use tokio::sync::oneshot::Receiver;
+
+use futures::Future;
+
+/// The type of thing that the dedicated executor runs
+type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks
+/// from IO-bound tasks(heartbeats). Get more from the above blog.
+#[derive(Clone)]
+pub struct DedicatedExecutor {
+    state: Arc<Mutex<State>>,
+}
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio Executor
+struct State {
+    /// The number of threads in this pool
+    num_threads: usize,
+
+    /// The name of the threads for this executor
+    thread_name: String,
+
+    /// Channel for requests -- the dedicated executor takes requests
+    /// from here and runs them.
+    requests: Option<std::sync::mpsc::Sender<Task>>,
+
+    /// The thread that is doing the work
+    thread: Option<std::thread::JoinHandle<()>>,
+}
+
+/// The default worker priority (value passed to `libc::setpriority`);
+const WORKER_PRIORITY: i32 = 10;
+
+impl std::fmt::Debug for DedicatedExecutor {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let state = self.state.lock();
+
+        let mut d = f.debug_struct("DedicatedExecutor");
+
+        d.field("num_threads", &state.num_threads)
+            .field("thread_name", &state.thread_name);
+
+        if state.requests.is_some() {
+            d.field("requests", &"Some(...)")
+        } else {
+            d.field("requests", &"None")
+        };
+
+        if state.thread.is_some() {
+            d.field("thread", &"Some(...)")
+        } else {
+            d.field("thread", &"None")
+        };
+
+        d.finish()
+    }
+}
+
+impl DedicatedExecutor {
+    /// https://stackoverflow.com/questions/62536566
+    /// Creates a new `DedicatedExecutor` with a dedicated tokio
+    /// runtime that is separate from the `[tokio::main]` threadpool.
+    ///
+    /// The worker thread priority is set to low so that such tasks do
+    /// not starve other more important tasks (such as answering health checks)
+    ///
+    pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self {

Review comment:
       FWIW I think @Darksonn and others have noted that a tokio `Handle` might also be able to be used here https://docs.rs/tokio/1.17.0/tokio/runtime/struct.Handle.html

##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         let executor_server = self.executor_server.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");
+            //TODO make it configurable

Review comment:
       maybe worth a ticket to track




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1054942649


   retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang edited a comment on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang edited a comment on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1050628519


   PTAK @alamb @houqp @yahoNanJing  @liukun4515 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1050628519


   PTAK @alamb @houqp @yahoNanJing  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r815543039



##########
File path: ballista/rust/executor/src/cpu_bound_executor.rs
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads such as DataFusion plans
+
+//! This module contains a dedicated thread pool for running "cpu
+//! intensive" workloads such as DataFusion plans
+
+use log::warn;
+use parking_lot::Mutex;
+use std::{pin::Pin, sync::Arc};
+use tokio::sync::oneshot::Receiver;
+
+use futures::Future;
+
+/// The type of thing that the dedicated executor runs
+type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
+/// them) on a separate tokio runtime
+#[derive(Clone)]
+pub struct DedicatedExecutor {

Review comment:
       Good point! Add blog at above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang edited a comment on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang edited a comment on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1050628519


   PTAL @alamb @houqp @yahoNanJing  @liukun4515 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] thinkharderdev commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r816013387



##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         let executor_server = self.executor_server.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");
+            //TODO make it configurable
+            let dedicated_executor = DedicatedExecutor::new("task_runner", 4);

Review comment:
       I think this should default to the number of available CPU cores. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1050759943


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang commented on a change in pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on a change in pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r816398331



##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         let executor_server = self.executor_server.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");
+            //TODO make it configurable
+            let dedicated_executor = DedicatedExecutor::new("task_runner", 4);

Review comment:
       set default to 4 due to: in ballista conf  `concurrent_tasks` set 4 . We will make it configurable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Ted-Jiang closed pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
Ted-Jiang closed pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1883: Separate cpu-bound (query-execution) and IO-bound(heartbeat) to …

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1883:
URL: https://github.com/apache/arrow-datafusion/pull/1883#issuecomment-1055340214


   I'll merge this one in to keep things going and we can adjust the default number of cores as follow on work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org