You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/28 14:21:54 UTC
[arrow-datafusion] branch master updated: Add Keda autoscaling for
ballista in k8s (#586)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new eba0fcf Add Keda autoscaling for ballista in k8s (#586)
eba0fcf is described below
commit eba0fcf1c4dcec199e6b7843c04e3be0e7a8261e
Author: Ximo Guanter <xi...@gmail.com>
AuthorDate: Mon Jun 28 16:21:40 2021 +0200
Add Keda autoscaling for ballista in k8s (#586)
---
ballista/rust/scheduler/Cargo.toml | 1 +
ballista/rust/scheduler/build.rs | 7 ++-
ballista/rust/scheduler/proto/keda.proto | 63 ++++++++++++++++++++++++
ballista/rust/scheduler/src/lib.rs | 69 +++++++++++++++++++++++++--
ballista/rust/scheduler/src/main.rs | 4 ++
ballista/rust/scheduler/src/state/mod.rs | 28 ++++++-----
docs/user-guide/src/distributed/kubernetes.md | 50 ++++++++++++++++++-
7 files changed, 201 insertions(+), 21 deletions(-)
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 215c58a..9bca8d9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -61,6 +61,7 @@ uuid = { version = "0.8", features = ["v4"] }
[build-dependencies]
configure_me_codegen = "0.4.0"
+tonic-build = { version = "0.4" }
[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
diff --git a/ballista/rust/scheduler/build.rs b/ballista/rust/scheduler/build.rs
index bae6a3b..e90bd49 100644
--- a/ballista/rust/scheduler/build.rs
+++ b/ballista/rust/scheduler/build.rs
@@ -20,5 +20,10 @@ extern crate configure_me_codegen;
fn main() -> Result<(), String> {
println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
configure_me_codegen::build_script_auto()
- .map_err(|e| format!("configure_me code generation failed: {}", e))
+ .map_err(|e| format!("configure_me code generation failed: {}", e))?;
+
+ println!("cargo:rerun-if-changed=proto/keda.proto");
+ tonic_build::configure()
+ .compile(&["proto/keda.proto"], &["proto"])
+ .map_err(|e| format!("protobuf compilation failed: {}", e))
}
diff --git a/ballista/rust/scheduler/proto/keda.proto b/ballista/rust/scheduler/proto/keda.proto
new file mode 100644
index 0000000..051dd43
--- /dev/null
+++ b/ballista/rust/scheduler/proto/keda.proto
@@ -0,0 +1,63 @@
+/*
+ Copyright 2020 The KEDA Authors.
+
+ and others that have contributed code to the public domain.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at.
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+// This file comes from https://github.com/kedacore/keda/blob/main/pkg/scalers/externalscaler/externalscaler.proto
+syntax = "proto3";
+
+package externalscaler;
+option go_package = ".;externalscaler";
+
+service ExternalScaler {
+ rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
+ // Commented out since we aren't supporting the streaming scaler interface at the moment
+ // rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
+ rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
+ rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
+}
+
+message ScaledObjectRef {
+ string name = 1;
+ string namespace = 2;
+ map<string, string> scalerMetadata = 3;
+}
+
+message IsActiveResponse {
+ bool result = 1;
+}
+
+message GetMetricSpecResponse {
+ repeated MetricSpec metricSpecs = 1;
+}
+
+message MetricSpec {
+ string metricName = 1;
+ int64 targetSize = 2;
+}
+
+message GetMetricsRequest {
+ ScaledObjectRef scaledObjectRef = 1;
+ string metricName = 2;
+}
+
+message GetMetricsResponse {
+ repeated MetricValue metricValues = 1;
+}
+
+message MetricValue {
+ string metricName = 1;
+ int64 metricValue = 2;
+}
\ No newline at end of file
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index 3620f79..3bd4c03 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -28,16 +28,22 @@ pub use standalone::new_standalone_scheduler;
#[cfg(test)]
pub mod test_utils;
+// include the generated protobuf source as a submodule
+#[allow(clippy::all)]
+pub mod externalscaler {
+ include!(concat!(env!("OUT_DIR"), "/externalscaler.rs"));
+}
+
use std::{convert::TryInto, sync::Arc};
use std::{fmt, net::IpAddr};
use ballista_core::serde::protobuf::{
execute_query_params::Query, executor_registration::OptionalHost, job_status,
- scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
- FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
- GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
- PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
- TaskStatus,
+ scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams,
+ ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType,
+ GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
+ JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob,
+ TaskDefinition, TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;
@@ -62,6 +68,10 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
}
}
+use crate::externalscaler::{
+ external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
+ GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
+};
use crate::planner::DistributedPlanner;
use log::{debug, error, info, warn};
@@ -103,6 +113,55 @@ impl SchedulerServer {
}
}
+const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
+
+#[tonic::async_trait]
+impl ExternalScaler for SchedulerServer {
+ async fn is_active(
+ &self,
+ _request: Request<ScaledObjectRef>,
+ ) -> Result<Response<IsActiveResponse>, tonic::Status> {
+ let tasks = self.state.get_all_tasks().await.map_err(|e| {
+ let msg = format!("Error reading tasks: {}", e);
+ error!("{}", msg);
+ tonic::Status::internal(msg)
+ })?;
+ let result = tasks.iter().any(|(_key, task)| {
+ !matches!(
+ task.status,
+ Some(task_status::Status::Completed(_))
+ | Some(task_status::Status::Failed(_))
+ )
+ });
+ debug!("Are there active tasks? {}", result);
+ Ok(Response::new(IsActiveResponse { result }))
+ }
+
+ async fn get_metric_spec(
+ &self,
+ _request: Request<ScaledObjectRef>,
+ ) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
+ Ok(Response::new(GetMetricSpecResponse {
+ metric_specs: vec![MetricSpec {
+ metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
+ target_size: 1,
+ }],
+ }))
+ }
+
+ async fn get_metrics(
+ &self,
+ _request: Request<GetMetricsRequest>,
+ ) -> Result<Response<GetMetricsResponse>, tonic::Status> {
+ Ok(Response::new(GetMetricsResponse {
+ metric_values: vec![MetricValue {
+ metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
+ metric_value: 10000000, // A very high number to saturate the HPA
+ }],
+ }))
+ }
+}
+
#[tonic::async_trait]
impl SchedulerGrpc for SchedulerServer {
async fn poll_work(
diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs
index 34386ca..7b79eb1 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -18,6 +18,7 @@
//! Ballista Rust scheduler binary.
use anyhow::{Context, Result};
+use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerServer;
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
@@ -72,8 +73,11 @@ async fn start_server(
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());
+ let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());
+
let mut tonic = TonicServer::builder()
.add_service(scheduler_grpc_server)
+ .add_service(keda_scaler)
.into_service();
let mut warp = warp::service(get_routes(scheduler_server));
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index a17c82d..cbee3f1 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -236,6 +236,15 @@ impl SchedulerState {
Ok((&value).try_into()?)
}
+ pub async fn get_all_tasks(&self) -> Result<HashMap<String, TaskStatus>> {
+ self.config_client
+ .get_from_prefix(&get_task_prefix(&self.namespace))
+ .await?
+ .into_iter()
+ .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
+ .collect()
+ }
+
/// This function ensures that the task wasn't assigned to an executor that died.
/// If that is the case, then the task is re-scheduled.
/// Returns true if the task was dead, false otherwise.
@@ -274,18 +283,12 @@ impl SchedulerState {
&self,
executor_id: &str,
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
- let kvs: HashMap<String, Vec<u8>> = self
- .config_client
- .get_from_prefix(&get_task_prefix(&self.namespace))
- .await?
- .into_iter()
- .collect();
+ let tasks = self.get_all_tasks().await?;
// TODO: Make the duration a configurable parameter
let executors = self
.get_alive_executors_metadata(Duration::from_secs(60))
.await?;
- 'tasks: for (_key, value) in kvs.iter() {
- let mut status: TaskStatus = decode_protobuf(value)?;
+ 'tasks: for (_key, status) in tasks.iter() {
if status.status.is_none() {
let partition = status.partition_id.as_ref().unwrap();
let plan = self
@@ -301,7 +304,7 @@ impl SchedulerState {
for unresolved_shuffle in unresolved_shuffles {
for stage_id in unresolved_shuffle.query_stage_ids {
for partition_id in 0..unresolved_shuffle.partition_count {
- let referenced_task = kvs
+ let referenced_task = tasks
.get(&get_task_status_key(
&self.namespace,
&partition.job_id,
@@ -309,8 +312,6 @@ impl SchedulerState {
partition_id,
))
.unwrap();
- let referenced_task: TaskStatus =
- decode_protobuf(referenced_task)?;
let task_is_dead = self
.reschedule_dead_task(&referenced_task, &executors)
.await?;
@@ -318,14 +319,14 @@ impl SchedulerState {
continue 'tasks;
} else if let Some(task_status::Status::Completed(
CompletedTask { executor_id },
- )) = referenced_task.status
+ )) = &referenced_task.status
{
let empty = vec![];
let locations =
partition_locations.entry(stage_id).or_insert(empty);
let executor_meta = executors
.iter()
- .find(|exec| exec.id == executor_id)
+ .find(|exec| exec.id == *executor_id)
.unwrap()
.clone();
locations.push(vec![
@@ -350,6 +351,7 @@ impl SchedulerState {
remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?;
// If we get here, there are no more unresolved shuffled and the task can be run
+ let mut status = status.clone();
status.status = Some(task_status::Status::Running(RunningTask {
executor_id: executor_id.to_owned(),
}));
diff --git a/docs/user-guide/src/distributed/kubernetes.md b/docs/user-guide/src/distributed/kubernetes.md
index 07b51f7..4b80d17 100644
--- a/docs/user-guide/src/distributed/kubernetes.md
+++ b/docs/user-guide/src/distributed/kubernetes.md
@@ -28,6 +28,7 @@ The k8s deployment consists of:
- k8s deployment for one or more executor processes
- k8s service to route traffic to the schedulers
- k8s persistent volume and persistent volume claims to make local data accessible to Ballista
+- _(optional)_ a [keda](http://keda.sh) instance for autoscaling the number of executors
## Limitations
@@ -163,8 +164,8 @@ spec:
image: <your-image>
command: ["/executor"]
args:
- - "--bind-port=50051",
- - "--scheduler-host=ballista-scheduler",
+ - "--bind-port=50051"
+ - "--scheduler-host=ballista-scheduler"
- "--scheduler-port=50050"
ports:
- containerPort: 50051
@@ -216,3 +217,48 @@ Run the following kubectl command to delete the cluster.
```bash
kubectl delete -f cluster.yaml
```
+
+## Adding autoscaling for executors
+
+Ballista supports autoscaling for executors through [Keda](http://keda.sh). Keda allows scaling a deployment
+through custom metrics which are exposed through the Ballista scheduler, and it can even scale the number of
+executors down to 0 if there is no activity in the cluster.
+
+Keda can be installed in your kubernetes cluster through a single command line:
+
+```bash
+kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.3.0/keda-2.3.0.yaml
+```
+
+Once you have deployed Keda on your cluster, you can now deploy a new kubernetes object called `ScaledObject`
+which will let Keda know how to scale your executors. In order to do that, copy the following YAML into a
+`scale.yaml` file:
+
+```yaml
+apiVersion: keda.sh/v1alpha1
+kind: ScaledObject
+metadata:
+ name: ballista-executor
+spec:
+ scaleTargetRef:
+ name: ballista-executor
+ minReplicaCount: 0
+ maxReplicaCount: 5
+ triggers:
+ - type: external
+ metadata:
+ # Change this DNS if the scheduler isn't deployed in the "default" namespace
+ scalerAddress: ballista-scheduler.default.svc.cluster.local:50050
+```
+
+And then deploy it into the cluster:
+
+```bash
+kubectl apply -f scale.yaml
+```
+
+If the cluster is inactive, Keda will now scale the number of executors down to 0, and will scale them up when
+you launch a query. Please note that Keda will perform a scan once every 30 seconds, so it might take a bit to
+scale the executors.
+
+Please visit Keda's [documentation page](https://keda.sh/docs/2.3/concepts/scaling-deployments/) for more information.
\ No newline at end of file