You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/28 14:21:54 UTC

[arrow-datafusion] branch master updated: Add Keda autoscaling for ballista in k8s (#586)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new eba0fcf  Add Keda autoscaling for ballista in k8s (#586)
eba0fcf is described below

commit eba0fcf1c4dcec199e6b7843c04e3be0e7a8261e
Author: Ximo Guanter <xi...@gmail.com>
AuthorDate: Mon Jun 28 16:21:40 2021 +0200

    Add Keda autoscaling for ballista in k8s (#586)
---
 ballista/rust/scheduler/Cargo.toml            |  1 +
 ballista/rust/scheduler/build.rs              |  7 ++-
 ballista/rust/scheduler/proto/keda.proto      | 63 ++++++++++++++++++++++++
 ballista/rust/scheduler/src/lib.rs            | 69 +++++++++++++++++++++++++--
 ballista/rust/scheduler/src/main.rs           |  4 ++
 ballista/rust/scheduler/src/state/mod.rs      | 28 ++++++-----
 docs/user-guide/src/distributed/kubernetes.md | 50 ++++++++++++++++++-
 7 files changed, 201 insertions(+), 21 deletions(-)

diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 215c58a..9bca8d9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -61,6 +61,7 @@ uuid = { version = "0.8", features = ["v4"] }
 
 [build-dependencies]
 configure_me_codegen = "0.4.0"
+tonic-build = { version = "0.4" }
 
 [package.metadata.configure_me.bin]
 scheduler = "scheduler_config_spec.toml"
diff --git a/ballista/rust/scheduler/build.rs b/ballista/rust/scheduler/build.rs
index bae6a3b..e90bd49 100644
--- a/ballista/rust/scheduler/build.rs
+++ b/ballista/rust/scheduler/build.rs
@@ -20,5 +20,10 @@ extern crate configure_me_codegen;
 fn main() -> Result<(), String> {
     println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
     configure_me_codegen::build_script_auto()
-        .map_err(|e| format!("configure_me code generation failed: {}", e))
+        .map_err(|e| format!("configure_me code generation failed: {}", e))?;
+
+    println!("cargo:rerun-if-changed=proto/keda.proto");
+    tonic_build::configure()
+        .compile(&["proto/keda.proto"], &["proto"])
+        .map_err(|e| format!("protobuf compilation failed: {}", e))
 }
diff --git a/ballista/rust/scheduler/proto/keda.proto b/ballista/rust/scheduler/proto/keda.proto
new file mode 100644
index 0000000..051dd43
--- /dev/null
+++ b/ballista/rust/scheduler/proto/keda.proto
@@ -0,0 +1,63 @@
+/*
+   Copyright 2020 The KEDA Authors.
+
+   and others that have contributed code to the public domain.
+
+   Licensed under the Apache License, Version 2.0 (the "License"); 
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at.
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+// This file comes from https://github.com/kedacore/keda/blob/main/pkg/scalers/externalscaler/externalscaler.proto
+syntax = "proto3";
+
+package externalscaler;
+option go_package = ".;externalscaler";
+
+service ExternalScaler {
+    rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
+    // Commented out since we aren't supporting the streaming scaler interface at the moment
+    // rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
+    rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
+    rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
+}
+
+message ScaledObjectRef {
+    string name = 1;
+    string namespace = 2;
+    map<string, string> scalerMetadata = 3;
+}
+
+message IsActiveResponse {
+    bool result = 1;
+}
+
+message GetMetricSpecResponse {
+    repeated MetricSpec metricSpecs = 1;
+}
+
+message MetricSpec {
+    string metricName = 1;
+    int64 targetSize = 2;
+}
+
+message GetMetricsRequest {
+    ScaledObjectRef scaledObjectRef = 1;
+    string metricName = 2;
+}
+
+message GetMetricsResponse {
+    repeated MetricValue metricValues = 1;
+}
+
+message MetricValue {
+    string metricName = 1;
+    int64 metricValue = 2;
+}
\ No newline at end of file
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index 3620f79..3bd4c03 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -28,16 +28,22 @@ pub use standalone::new_standalone_scheduler;
 #[cfg(test)]
 pub mod test_utils;
 
+// include the generated protobuf source as a submodule
+#[allow(clippy::all)]
+pub mod externalscaler {
+    include!(concat!(env!("OUT_DIR"), "/externalscaler.rs"));
+}
+
 use std::{convert::TryInto, sync::Arc};
 use std::{fmt, net::IpAddr};
 
 use ballista_core::serde::protobuf::{
     execute_query_params::Query, executor_registration::OptionalHost, job_status,
-    scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
-    FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
-    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
-    PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
-    TaskStatus,
+    scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams,
+    ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType,
+    GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
+    JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob,
+    TaskDefinition, TaskStatus,
 };
 use ballista_core::serde::scheduler::ExecutorMeta;
 
@@ -62,6 +68,10 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
     }
 }
 
+use crate::externalscaler::{
+    external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
+    GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
+};
 use crate::planner::DistributedPlanner;
 
 use log::{debug, error, info, warn};
@@ -103,6 +113,55 @@ impl SchedulerServer {
     }
 }
 
+const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
+
+#[tonic::async_trait]
+impl ExternalScaler for SchedulerServer {
+    async fn is_active(
+        &self,
+        _request: Request<ScaledObjectRef>,
+    ) -> Result<Response<IsActiveResponse>, tonic::Status> {
+        let tasks = self.state.get_all_tasks().await.map_err(|e| {
+            let msg = format!("Error reading tasks: {}", e);
+            error!("{}", msg);
+            tonic::Status::internal(msg)
+        })?;
+        let result = tasks.iter().any(|(_key, task)| {
+            !matches!(
+                task.status,
+                Some(task_status::Status::Completed(_))
+                    | Some(task_status::Status::Failed(_))
+            )
+        });
+        debug!("Are there active tasks? {}", result);
+        Ok(Response::new(IsActiveResponse { result }))
+    }
+
+    async fn get_metric_spec(
+        &self,
+        _request: Request<ScaledObjectRef>,
+    ) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
+        Ok(Response::new(GetMetricSpecResponse {
+            metric_specs: vec![MetricSpec {
+                metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
+                target_size: 1,
+            }],
+        }))
+    }
+
+    async fn get_metrics(
+        &self,
+        _request: Request<GetMetricsRequest>,
+    ) -> Result<Response<GetMetricsResponse>, tonic::Status> {
+        Ok(Response::new(GetMetricsResponse {
+            metric_values: vec![MetricValue {
+                metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
+                metric_value: 10000000, // A very high number to saturate the HPA
+            }],
+        }))
+    }
+}
+
 #[tonic::async_trait]
 impl SchedulerGrpc for SchedulerServer {
     async fn poll_work(
diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs
index 34386ca..7b79eb1 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -18,6 +18,7 @@
 //! Ballista Rust scheduler binary.
 
 use anyhow::{Context, Result};
+use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerServer;
 use futures::future::{self, Either, TryFutureExt};
 use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
 use std::convert::Infallible;
@@ -72,8 +73,11 @@ async fn start_server(
             let scheduler_grpc_server =
                 SchedulerGrpcServer::new(scheduler_server.clone());
 
+            let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());
+
             let mut tonic = TonicServer::builder()
                 .add_service(scheduler_grpc_server)
+                .add_service(keda_scaler)
                 .into_service();
             let mut warp = warp::service(get_routes(scheduler_server));
 
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index a17c82d..cbee3f1 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -236,6 +236,15 @@ impl SchedulerState {
         Ok((&value).try_into()?)
     }
 
+    pub async fn get_all_tasks(&self) -> Result<HashMap<String, TaskStatus>> {
+        self.config_client
+            .get_from_prefix(&get_task_prefix(&self.namespace))
+            .await?
+            .into_iter()
+            .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
+            .collect()
+    }
+
     /// This function ensures that the task wasn't assigned to an executor that died.
     /// If that is the case, then the task is re-scheduled.
     /// Returns true if the task was dead, false otherwise.
@@ -274,18 +283,12 @@ impl SchedulerState {
         &self,
         executor_id: &str,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
-        let kvs: HashMap<String, Vec<u8>> = self
-            .config_client
-            .get_from_prefix(&get_task_prefix(&self.namespace))
-            .await?
-            .into_iter()
-            .collect();
+        let tasks = self.get_all_tasks().await?;
         // TODO: Make the duration a configurable parameter
         let executors = self
             .get_alive_executors_metadata(Duration::from_secs(60))
             .await?;
-        'tasks: for (_key, value) in kvs.iter() {
-            let mut status: TaskStatus = decode_protobuf(value)?;
+        'tasks: for (_key, status) in tasks.iter() {
             if status.status.is_none() {
                 let partition = status.partition_id.as_ref().unwrap();
                 let plan = self
@@ -301,7 +304,7 @@ impl SchedulerState {
                 for unresolved_shuffle in unresolved_shuffles {
                     for stage_id in unresolved_shuffle.query_stage_ids {
                         for partition_id in 0..unresolved_shuffle.partition_count {
-                            let referenced_task = kvs
+                            let referenced_task = tasks
                                 .get(&get_task_status_key(
                                     &self.namespace,
                                     &partition.job_id,
@@ -309,8 +312,6 @@ impl SchedulerState {
                                     partition_id,
                                 ))
                                 .unwrap();
-                            let referenced_task: TaskStatus =
-                                decode_protobuf(referenced_task)?;
                             let task_is_dead = self
                                 .reschedule_dead_task(&referenced_task, &executors)
                                 .await?;
@@ -318,14 +319,14 @@ impl SchedulerState {
                                 continue 'tasks;
                             } else if let Some(task_status::Status::Completed(
                                 CompletedTask { executor_id },
-                            )) = referenced_task.status
+                            )) = &referenced_task.status
                             {
                                 let empty = vec![];
                                 let locations =
                                     partition_locations.entry(stage_id).or_insert(empty);
                                 let executor_meta = executors
                                     .iter()
-                                    .find(|exec| exec.id == executor_id)
+                                    .find(|exec| exec.id == *executor_id)
                                     .unwrap()
                                     .clone();
                                 locations.push(vec![
@@ -350,6 +351,7 @@ impl SchedulerState {
                     remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?;
 
                 // If we get here, there are no more unresolved shuffled and the task can be run
+                let mut status = status.clone();
                 status.status = Some(task_status::Status::Running(RunningTask {
                     executor_id: executor_id.to_owned(),
                 }));
diff --git a/docs/user-guide/src/distributed/kubernetes.md b/docs/user-guide/src/distributed/kubernetes.md
index 07b51f7..4b80d17 100644
--- a/docs/user-guide/src/distributed/kubernetes.md
+++ b/docs/user-guide/src/distributed/kubernetes.md
@@ -28,6 +28,7 @@ The k8s deployment consists of:
 - k8s deployment for one or more executor processes
 - k8s service to route traffic to the schedulers
 - k8s persistent volume and persistent volume claims to make local data accessible to Ballista
+- _(optional)_ a [keda](http://keda.sh) instance for autoscaling the number of executors
 
 ## Limitations
 
@@ -163,8 +164,8 @@ spec:
           image: <your-image>
           command: ["/executor"]
           args:
-            - "--bind-port=50051",
-            - "--scheduler-host=ballista-scheduler",
+            - "--bind-port=50051"
+            - "--scheduler-host=ballista-scheduler"
             - "--scheduler-port=50050"
           ports:
             - containerPort: 50051
@@ -216,3 +217,48 @@ Run the following kubectl command to delete the cluster.
 ```bash
 kubectl delete -f cluster.yaml
 ```
+
+## Adding autoscaling for executors
+
+Ballista supports autoscaling for executors through [Keda](http://keda.sh). Keda allows scaling a deployment
+through custom metrics which are exposed through the Ballista scheduler, and it can even scale the number of
+executors down to 0 if there is no activity in the cluster.
+
+Keda can be installed in your kubernetes cluster through a single command line:
+
+```bash
+kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.3.0/keda-2.3.0.yaml
+```
+
+Once you have deployed Keda on your cluster, you can now deploy a new kubernetes object called `ScaledObject`
+which will let Keda know how to scale your executors. In order to do that, copy the following YAML into a
+`scale.yaml` file:
+
+```yaml
+apiVersion: keda.sh/v1alpha1
+kind: ScaledObject
+metadata:
+  name: ballista-executor
+spec:
+  scaleTargetRef:
+    name: ballista-executor
+  minReplicaCount: 0
+  maxReplicaCount: 5
+  triggers:
+  - type: external
+    metadata:
+      # Change this DNS if the scheduler isn't deployed in the "default" namespace
+      scalerAddress: ballista-scheduler.default.svc.cluster.local:50050
+```
+
+And then deploy it into the cluster:
+
+```bash
+kubectl apply -f scale.yaml
+```
+
+If the cluster is inactive, Keda will now scale the number of executors down to 0, and will scale them up when
+you launch a query. Please note that Keda will perform a scan once every 30 seconds, so it might take a bit to
+scale the executors.
+
+Please visit Keda's [documentation page](https://keda.sh/docs/2.3/concepts/scaling-deployments/) for more information.
\ No newline at end of file