You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 20:17:51 UTC
[arrow-datafusion] branch master updated: add ballista plugin manager and udf plugin (#2131)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 70f2b1a9b add ballista plugin manager and udf plugin (#2131)
70f2b1a9b is described below
commit 70f2b1a9bd85373fabaae56aa7ea60da6e3dace9
Author: gaojun2048 <ga...@gmail.com>
AuthorDate: Fri Apr 8 04:17:46 2022 +0800
add ballista plugin manager and udf plugin (#2131)
---
ballista/rust/core/Cargo.toml | 5 +
ballista/rust/core/build.rs | 2 +
ballista/rust/core/src/config.rs | 26 +++-
ballista/rust/core/src/lib.rs | 2 +
ballista/rust/core/src/plugin/mod.rs | 127 +++++++++++++++++
ballista/rust/core/src/plugin/plugin_manager.rs | 150 ++++++++++++++++++++
ballista/rust/core/src/plugin/udf.rs | 152 +++++++++++++++++++++
ballista/rust/executor/executor_config_spec.toml | 6 +
ballista/rust/scheduler/scheduler_config_spec.toml | 8 +-
9 files changed, 476 insertions(+), 2 deletions(-)
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 800ed53be..8e2768341 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -42,7 +42,10 @@ datafusion = { path = "../../../datafusion/core", version = "7.0.0" }
datafusion-proto = { path = "../../../datafusion/proto", version = "7.0.0" }
futures = "0.3"
hashbrown = "0.12"
+
+libloading = "0.7.3"
log = "0.4"
+once_cell = "1.9.0"
parking_lot = "0.12"
parse_arg = "0.1.3"
@@ -53,9 +56,11 @@ sqlparser = "0.16"
tokio = "1.0"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
+walkdir = "2.3.2"
[dev-dependencies]
tempfile = "3"
[build-dependencies]
+rustc_version = "0.4.0"
tonic-build = { version = "0.6" }
diff --git a/ballista/rust/core/build.rs b/ballista/rust/core/build.rs
index b5110f8f5..c2acde108 100644
--- a/ballista/rust/core/build.rs
+++ b/ballista/rust/core/build.rs
@@ -20,6 +20,8 @@ fn main() -> Result<(), String> {
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
println!("cargo:rerun-if-changed=proto/ballista.proto");
+ let version = rustc_version::version().unwrap();
+ println!("cargo:rustc-env=RUSTC_VERSION={}", version);
println!("cargo:rerun-if-changed=proto/datafusion.proto");
tonic_build::configure()
.extern_path(".datafusion", "::datafusion_proto::protobuf")
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index fffe0ead3..1ff02c16d 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggreg
pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
+/// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
+pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
pub type ParseResult<T> = result::Result<T, String>;
@@ -139,6 +141,9 @@ impl BallistaConfig {
.parse::<bool>()
.map_err(|e| format!("{:?}", e))?;
}
+ DataType::Utf8 => {
+ val.to_string();
+ }
_ => {
return Err(format!("not support data type: {}", data_type));
}
@@ -171,6 +176,9 @@ impl BallistaConfig {
ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
"Sets whether enable information_schema".to_string(),
DataType::Boolean,Some("false".to_string())),
+ ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
+ "Sets the plugin dir".to_string(),
+ DataType::Utf8,Some("".to_string())),
];
entries
.iter()
@@ -186,6 +194,10 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
}
+ pub fn default_plugin_dir(&self) -> String {
+ self.get_string_setting(BALLISTA_PLUGIN_DIR)
+ }
+
pub fn default_batch_size(&self) -> usize {
self.get_usize_setting(BALLISTA_DEFAULT_BATCH_SIZE)
}
@@ -233,6 +245,17 @@ impl BallistaConfig {
v.parse::<bool>().unwrap()
}
}
+ fn get_string_setting(&self, key: &str) -> String {
+ if let Some(v) = self.settings.get(key) {
+ // infallible because we validate all configs in the constructor
+ v.to_string()
+ } else {
+ let entries = Self::valid_entries();
+ // infallible because we validate all configs in the constructor
+ let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
+ v.to_string()
+ }
+ }
}
// an enum used to configure the scheduler policy
@@ -266,6 +289,7 @@ mod tests {
let config = BallistaConfig::new()?;
assert_eq!(2, config.default_shuffle_partitions());
assert!(!config.default_with_information_schema());
+ assert_eq!("", config.default_plugin_dir().as_str());
Ok(())
}
@@ -284,6 +308,7 @@ mod tests {
fn custom_config_invalid() -> Result<()> {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true")
+ .set(BALLISTA_PLUGIN_DIR, "test_dir")
.build();
assert!(config.is_err());
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
@@ -293,7 +318,6 @@ mod tests {
.build();
assert!(config.is_err());
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.with_information_schema' for configuration setting '123': ParseBoolError\")", format!("{:?}", config.unwrap_err()));
-
Ok(())
}
}
diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs
index c452a45b1..34f4699e1 100644
--- a/ballista/rust/core/src/lib.rs
+++ b/ballista/rust/core/src/lib.rs
@@ -27,6 +27,8 @@ pub mod config;
pub mod error;
pub mod event_loop;
pub mod execution_plans;
+/// some plugins
+pub mod plugin;
pub mod utils;
#[macro_use]
diff --git a/ballista/rust/core/src/plugin/mod.rs b/ballista/rust/core/src/plugin/mod.rs
new file mode 100644
index 000000000..a7012af47
--- /dev/null
+++ b/ballista/rust/core/src/plugin/mod.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::plugin::udf::UDFPluginManager;
+use libloading::Library;
+use std::any::Any;
+use std::env;
+use std::sync::Arc;
+
+/// plugin manager
+pub mod plugin_manager;
+/// udf plugin
+pub mod udf;
+
+/// CARGO_PKG_VERSION
+pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
+/// RUSTC_VERSION
+pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION");
+
+/// Top plugin trait
+pub trait Plugin {
+ /// Returns the plugin as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+}
+
+/// The enum of Plugin
+#[derive(PartialEq, std::cmp::Eq, std::hash::Hash, Copy, Clone)]
+pub enum PluginEnum {
+ /// UDF/UDAF plugin
+ UDF,
+}
+
+impl PluginEnum {
+ /// new a struct which impl the PluginRegistrar trait
+ pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
+ match self {
+ PluginEnum::UDF => Box::new(UDFPluginManager::default()),
+ }
+ }
+}
+
+/// Every plugin need a PluginDeclaration
+#[derive(Copy, Clone)]
+pub struct PluginDeclaration {
+ /// Rust doesn’t have a stable ABI, meaning different compiler versions can generate incompatible code.
+ /// For these reasons, the UDF plug-in must be compiled using the same version of rustc as datafusion.
+ pub rustc_version: &'static str,
+
+ /// core version of the plugin. The plugin's core_version need same as plugin manager.
+ pub core_version: &'static str,
+
+ /// One of PluginEnum
+ pub plugin_type: unsafe extern "C" fn() -> PluginEnum,
+}
+
+/// Plugin Registrar , Every plugin need implement this trait
+pub trait PluginRegistrar: Send + Sync + 'static {
+ /// # Safety
+ /// load plugin from library
+ unsafe fn load(&mut self, library: Arc<Library>) -> Result<()>;
+
+ /// Returns the plugin as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+}
+
+/// Declare a plugin's PluginDeclaration.
+///
+/// # Notes
+///
+/// This works by automatically generating an `extern "C"` function named `get_plugin_type` with a
+/// pre-defined signature and symbol name. And then generating a PluginDeclaration.
+/// Therefore you will only be able to declare one plugin per library.
+#[macro_export]
+macro_rules! declare_plugin {
+ ($plugin_type:expr) => {
+ #[no_mangle]
+ pub extern "C" fn get_plugin_type() -> $crate::plugin::PluginEnum {
+ $plugin_type
+ }
+
+ #[no_mangle]
+ pub static plugin_declaration: $crate::plugin::PluginDeclaration =
+ $crate::plugin::PluginDeclaration {
+ rustc_version: $crate::plugin::RUSTC_VERSION,
+ core_version: $crate::plugin::CORE_VERSION,
+ plugin_type: get_plugin_type,
+ };
+ };
+}
+
+/// get the plugin dir
+pub fn plugin_dir() -> String {
+ let current_exe_dir = match env::current_exe() {
+ Ok(exe_path) => exe_path.display().to_string(),
+ Err(_e) => "".to_string(),
+ };
+
+ // If current_exe_dir contain `deps` the root dir is the parent dir
+ // eg: /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/deps/plugins_app-067452b3ff2af70e
+ // the plugin dir is /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/deps
+ // else eg: /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/plugins_app
+ // the plugin dir is /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/
+ if current_exe_dir.contains("/deps/") {
+ let i = current_exe_dir.find("/deps/").unwrap();
+ String::from(¤t_exe_dir.as_str()[..i + 6])
+ } else {
+ let i = current_exe_dir.rfind('/').unwrap();
+ String::from(¤t_exe_dir.as_str()[..i])
+ }
+}
diff --git a/ballista/rust/core/src/plugin/plugin_manager.rs b/ballista/rust/core/src/plugin/plugin_manager.rs
new file mode 100644
index 000000000..e238383b4
--- /dev/null
+++ b/ballista/rust/core/src/plugin/plugin_manager.rs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::error::{BallistaError, Result};
+use libloading::Library;
+use log::info;
+use std::collections::HashMap;
+use std::io;
+use std::sync::{Arc, Mutex};
+use walkdir::{DirEntry, WalkDir};
+
+use crate::plugin::{
+ PluginDeclaration, PluginEnum, PluginRegistrar, CORE_VERSION, RUSTC_VERSION,
+};
+use once_cell::sync::OnceCell;
+
+/// To prevent the library from being loaded multiple times, we use once_cell defines a Arc<Mutex<GlobalPluginManager>>
+/// Because datafusion is a library, not a service, users may not need to load all plug-ins in the process.
+/// So fn global_plugin_manager return Arc<Mutex<GlobalPluginManager>>. In this way, users can load the required library through the load method of GlobalPluginManager when needed
+static INSTANCE: OnceCell<Arc<Mutex<GlobalPluginManager>>> = OnceCell::new();
+
+/// global_plugin_manager
+pub fn global_plugin_manager(
+ plugin_path: &str,
+) -> &'static Arc<Mutex<GlobalPluginManager>> {
+ INSTANCE.get_or_init(move || unsafe {
+ let mut gpm = GlobalPluginManager::default();
+ gpm.load(plugin_path).unwrap();
+ Arc::new(Mutex::new(gpm))
+ })
+}
+
+#[derive(Default)]
+/// manager all plugin_type's plugin_manager
+pub struct GlobalPluginManager {
+ /// every plugin need a plugin registrar
+ pub plugin_managers: HashMap<PluginEnum, Box<dyn PluginRegistrar>>,
+
+ /// loaded plugin files
+ pub plugin_files: Vec<String>,
+}
+
+impl GlobalPluginManager {
+ /// # Safety
+ /// find plugin file from `plugin_path` and load it .
+ unsafe fn load(&mut self, plugin_path: &str) -> Result<()> {
+ if "".eq(plugin_path) {
+ return Ok(());
+ }
+ // find library file from udaf_plugin_path
+ info!("load plugin from dir:{}", plugin_path);
+
+ let plugin_files = self.get_all_plugin_files(plugin_path)?;
+
+ for plugin_file in plugin_files {
+ let library = Library::new(plugin_file.path()).map_err(|e| {
+ BallistaError::IoError(io::Error::new(
+ io::ErrorKind::Other,
+ format!("load library error: {}", e),
+ ))
+ })?;
+
+ let library = Arc::new(library);
+
+ let dec = library.get::<*mut PluginDeclaration>(b"plugin_declaration\0");
+ if dec.is_err() {
+ info!(
+ "not found plugin_declaration in the library: {}",
+ plugin_file.path().to_str().unwrap()
+ );
+ continue;
+ }
+
+ let dec = dec.unwrap().read();
+
+ // ersion checks to prevent accidental ABI incompatibilities
+ if dec.rustc_version != RUSTC_VERSION || dec.core_version != CORE_VERSION {
+ return Err(BallistaError::IoError(io::Error::new(
+ io::ErrorKind::Other,
+ "Version mismatch",
+ )));
+ }
+
+ let plugin_enum = (dec.plugin_type)();
+ let curr_plugin_manager = match self.plugin_managers.get_mut(&plugin_enum) {
+ None => {
+ let plugin_manager = plugin_enum.init_plugin_manager();
+ self.plugin_managers.insert(plugin_enum, plugin_manager);
+ self.plugin_managers.get_mut(&plugin_enum).unwrap()
+ }
+ Some(manager) => manager,
+ };
+ curr_plugin_manager.load(library)?;
+ self.plugin_files
+ .push(plugin_file.path().to_str().unwrap().to_string());
+ }
+
+ Ok(())
+ }
+
+ /// get all plugin file in the dir
+ fn get_all_plugin_files(&self, plugin_path: &str) -> io::Result<Vec<DirEntry>> {
+ let mut plugin_files = Vec::new();
+ for entry in WalkDir::new(plugin_path).into_iter().filter_map(|e| {
+ let item = e.unwrap();
+ // every file only load once
+ if self
+ .plugin_files
+ .contains(&item.path().to_str().unwrap().to_string())
+ {
+ return None;
+ }
+
+ let file_type = item.file_type();
+ if !file_type.is_file() {
+ return None;
+ }
+
+ if let Some(path) = item.path().extension() {
+ if let Some(suffix) = path.to_str() {
+ if suffix == "dylib" || suffix == "so" || suffix == "dll" {
+ info!(
+ "load plugin from library file:{}",
+ item.path().to_str().unwrap()
+ );
+ return Some(item);
+ }
+ }
+ }
+
+ None
+ }) {
+ plugin_files.push(entry);
+ }
+ Ok(plugin_files)
+ }
+}
diff --git a/ballista/rust/core/src/plugin/udf.rs b/ballista/rust/core/src/plugin/udf.rs
new file mode 100644
index 000000000..ea82742fb
--- /dev/null
+++ b/ballista/rust/core/src/plugin/udf.rs
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::error::{BallistaError, Result};
+use crate::plugin::plugin_manager::global_plugin_manager;
+use crate::plugin::{Plugin, PluginEnum, PluginRegistrar};
+use datafusion::physical_plan::udaf::AggregateUDF;
+use datafusion::physical_plan::udf::ScalarUDF;
+use libloading::{Library, Symbol};
+use std::any::Any;
+use std::collections::HashMap;
+use std::io;
+use std::sync::Arc;
+
+/// UDF plugin trait
+pub trait UDFPlugin: Plugin {
+ /// get a ScalarUDF by name
+ fn get_scalar_udf_by_name(&self, fun_name: &str) -> Result<ScalarUDF>;
+
+ /// return all udf names in the plugin
+ fn udf_names(&self) -> Result<Vec<String>>;
+
+ /// get a aggregate udf by name
+ fn get_aggregate_udf_by_name(&self, fun_name: &str) -> Result<AggregateUDF>;
+
+ /// return all udaf names
+ fn udaf_names(&self) -> Result<Vec<String>>;
+}
+
+/// UDFPluginManager
+#[derive(Default, Clone)]
+pub struct UDFPluginManager {
+ /// scalar udfs
+ pub scalar_udfs: HashMap<String, Arc<ScalarUDF>>,
+
+ /// aggregate udfs
+ pub aggregate_udfs: HashMap<String, Arc<AggregateUDF>>,
+
+ /// All libraries load from the plugin dir.
+ pub libraries: Vec<Arc<Library>>,
+}
+
+impl PluginRegistrar for UDFPluginManager {
+ unsafe fn load(&mut self, library: Arc<Library>) -> Result<()> {
+ type PluginRegister = unsafe fn() -> Box<dyn UDFPlugin>;
+ let register_fun: Symbol<PluginRegister> =
+ library.get(b"registrar_udf_plugin\0").map_err(|e| {
+ BallistaError::IoError(io::Error::new(
+ io::ErrorKind::Other,
+ format!("not found fn registrar_udf_plugin in the library: {}", e),
+ ))
+ })?;
+
+ let udf_plugin: Box<dyn UDFPlugin> = register_fun();
+ udf_plugin
+ .udf_names()
+ .unwrap()
+ .iter()
+ .try_for_each(|udf_name| {
+ if self.scalar_udfs.contains_key(udf_name) {
+ Err(BallistaError::IoError(io::Error::new(
+ io::ErrorKind::Other,
+ format!("udf name: {} already exists", udf_name),
+ )))
+ } else {
+ let scalar_udf = udf_plugin.get_scalar_udf_by_name(udf_name)?;
+ self.scalar_udfs
+ .insert(udf_name.to_string(), Arc::new(scalar_udf));
+ Ok(())
+ }
+ })?;
+
+ udf_plugin
+ .udaf_names()
+ .unwrap()
+ .iter()
+ .try_for_each(|udaf_name| {
+ if self.aggregate_udfs.contains_key(udaf_name) {
+ Err(BallistaError::IoError(io::Error::new(
+ io::ErrorKind::Other,
+ format!("udaf name: {} already exists", udaf_name),
+ )))
+ } else {
+ let aggregate_udf =
+ udf_plugin.get_aggregate_udf_by_name(udaf_name)?;
+ self.aggregate_udfs
+ .insert(udaf_name.to_string(), Arc::new(aggregate_udf));
+ Ok(())
+ }
+ })?;
+ self.libraries.push(library);
+ Ok(())
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+}
+
+/// Declare a udf plugin registrar callback
+///
+/// # Notes
+///
+/// This works by automatically generating an `extern "C"` function named `registrar_udf_plugin` with a
+/// pre-defined signature and symbol name.
+/// Therefore you will only be able to declare one plugin per library.
+#[macro_export]
+macro_rules! declare_udf_plugin {
+ ($curr_plugin_type:ty, $constructor:path) => {
+ #[no_mangle]
+ pub extern "C" fn registrar_udf_plugin() -> Box<dyn $crate::plugin::udf::UDFPlugin> {
+ // make sure the constructor is the correct type.
+ let constructor: fn() -> $curr_plugin_type = $constructor;
+ let object = constructor();
+ Box::new(object)
+ }
+
+ $crate::declare_plugin!($crate::plugin::PluginEnum::UDF);
+ };
+}
+
+/// get a Option of Immutable UDFPluginManager
+pub fn get_udf_plugin_manager(path: &str) -> Option<UDFPluginManager> {
+ let udf_plugin_manager_opt = {
+ let gpm = global_plugin_manager(path).lock().unwrap();
+ let plugin_registrar_opt = gpm.plugin_managers.get(&PluginEnum::UDF);
+ if let Some(plugin_registrar) = plugin_registrar_opt {
+ if let Some(udf_plugin_manager) =
+ plugin_registrar.as_any().downcast_ref::<UDFPluginManager>()
+ {
+ return Some(udf_plugin_manager.clone());
+ } else {
+ return None;
+ }
+ }
+ None
+ };
+ udf_plugin_manager_opt
+}
diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml
index 167ec20d2..86e712bda 100644
--- a/ballista/rust/executor/executor_config_spec.toml
+++ b/ballista/rust/executor/executor_config_spec.toml
@@ -96,3 +96,9 @@ name = "executor_cleanup_ttl"
type = "u64"
doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained"
default = "604800"
+
+[[param]]
+name = "plugin_dir"
+type = "String"
+doc = "plugin dir"
+default = "std::string::String::from(\"\")"
\ No newline at end of file
diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml b/ballista/rust/scheduler/scheduler_config_spec.toml
index 000d74e7d..cca96edfa 100644
--- a/ballista/rust/scheduler/scheduler_config_spec.toml
+++ b/ballista/rust/scheduler/scheduler_config_spec.toml
@@ -64,4 +64,10 @@ abbr = "s"
name = "scheduler_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
-default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
\ No newline at end of file
+default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
+
+[[param]]
+name = "plugin_dir"
+type = "String"
+doc = "plugin dir"
+default = "std::string::String::from(\"\")"
\ No newline at end of file