You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 06:58:25 UTC

[GitHub] [arrow-datafusion] mingmwang commented on pull request #1881: add udf/udaf plugin

mingmwang commented on pull request #1881:
URL: https://github.com/apache/arrow-datafusion/pull/1881#issuecomment-1085503799


   > > It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions
   > 
   > Ok. I will update code and use TaskContext to serialization and deserialization UDF
   
   Yes, there are several changes to SessionContext in those days.  The Executor does not have a global SessionContext now.
   You can have your UDF Plugin Manager load all the dynamic UDFs/UDAFs to Executor's member. I had added a TOTO note .
   
   ````
   impl Executor {
       /// Create a new executor instance
       pub fn new(
           metadata: ExecutorRegistration,
           work_dir: &str,
           runtime: Arc<RuntimeEnv>,
       ) -> Self {
           Self {
               metadata,
               work_dir: work_dir.to_owned(),
               // TODO add logic to dynamically load UDF/UDAFs libs from files
               scalar_functions: HashMap::new(),
               aggregate_functions: HashMap::new(),
               runtime,
           }
       }
   }
   
   ````
   
   In Ballista Scheduler side, there is no global SessionContext either, SessionContext is created on users' requests.
   You can add the  UDF Plugin Manager to Ballista SchedulerServer, when the new session context was created, you can 
   call the  register the UDF/UDAFs to the created session context.
   
   
   ````
   /// Create a DataFusion session context that is compatible with Ballista Configuration
   pub fn create_datafusion_context(
       config: &BallistaConfig,
       session_builder: SessionBuilder,
   ) -> Arc<SessionContext> {
       let config = SessionConfig::new()
           .with_target_partitions(config.default_shuffle_partitions())
           .with_batch_size(config.default_batch_size())
           .with_repartition_joins(config.repartition_joins())
           .with_repartition_aggregations(config.repartition_aggregations())
           .with_repartition_windows(config.repartition_windows())
           .with_parquet_pruning(config.parquet_pruning());
       let session_state = session_builder(config);
       Arc::new(SessionContext::with_state(session_state))
       /// Add logic to register UDF/UDFS to context.
   }
   
   ````
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org