You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/26 02:29:04 UTC

[iotdb] branch master updated: [IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d7d69709f1 [IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)
d7d69709f1 is described below

commit d7d69709f11f3293a9a2da109c29fe89eb0ff041
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 26 10:28:59 2022 +0800

    [IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)
    
    New Syntax: CREATE FUNCTION AS function-name USING uris
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   6 +-
 .../resources/conf/iotdb-confignode.properties     |  28 ++-
 .../confignode/client/AsyncDataNodeClientPool.java |  16 ++
 .../client/handlers/CreateFunctionHandler.java     |  58 ++++++
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  36 +++-
 .../confignode/conf/ConfigNodeDescriptor.java      |   5 +-
 .../consensus/request/ConfigRequest.java           |   4 +
 .../consensus/request/ConfigRequestType.java       |   1 +
 .../consensus/request/write/CreateFunctionReq.java |  85 +++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |  21 ++-
 .../apache/iotdb/confignode/manager/Manager.java   |   9 +
 .../iotdb/confignode/manager/UDFManager.java       | 119 +++++++++++++
 .../iotdb/confignode/persistence/UDFInfo.java      | 117 ++++++++++++
 .../executor/ConfigRequestExecutor.java            |  11 +-
 .../iotdb/confignode/service/ConfigNode.java       |  18 ++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   6 +
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   1 +
 .../apache/iotdb/commons/service/ServiceType.java  |   1 +
 .../iotdb/commons/udf/service/SnapshotUtils.java   |  96 ++++++++++
 .../commons/udf/service/UDFClassLoaderManager.java |   8 +
 .../commons/udf/service/UDFExecutableManager.java  | 196 +++++++++++++++++++++
 .../commons/udf/service/UDFExecutableResource.java |  39 ++++
 .../iotdb/commons/udf/service/UDFLogWriter.java    |  21 ++-
 .../udf/service/UDFRegistrationInformation.java    |  13 +-
 .../udf/service/UDFRegistrationService.java        | 127 +++++++++++--
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  17 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   9 +
 .../plan/execution/config/ConfigTaskVisitor.java   |   8 +
 .../plan/execution/config/CreateFunctionTask.java  | 117 ++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  29 +++
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +
 .../metadata/CreateFunctionStatement.java          |  73 ++++++++
 .../java/org/apache/iotdb/db/service/DataNode.java |  30 ++--
 .../service/thrift/impl/InternalServiceImpl.java   |  20 +++
 .../src/main/thrift/confignode.thrift              |  11 ++
 thrift/src/main/thrift/mpp.thrift                  |  15 +-
 36 files changed, 1329 insertions(+), 48 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 73e6a76121..326eac60b0 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -115,7 +115,11 @@ createTimeseriesOfSchemaTemplate
 
 // Create Function
 createFunction
-    : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL
+    : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL (USING uri (COMMA uri)*)?
+    ;
+
+uri
+    : STRING_LITERAL
     ;
 
 // Create Trigger
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index b16a399ec3..8fb85749b6 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -212,9 +212,9 @@ target_confignode=0.0.0.0:22277
 # proc_wal_dir=data/proc
 
 
-# UDF dir
+# UDF lib dir
 # If this property is unset, system will save the data in the default relative path directory under
-# the UDF folder(i.e., %CONFIGNODE_HOME%/data/udf).
+# the UDF folder(i.e., %CONFIGNODE_HOME%/ext/udf).
 #
 # If it is absolute, system will save the data in exact location it points to.
 # If it is relative, system will save the data in the relative path directory it indicates under the
@@ -225,11 +225,31 @@ target_confignode=0.0.0.0:22277
 # For Window platform
 # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
 # absolute. Otherwise, it is relative.
-# udf_dir=data\\udf
+# udf_lib_dir=ext\\udf
 #
 # For Linux platform
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
-# udf_dir=data/udf
+# udf_lib_dir=ext/udf
+
+
+# temporary lib dir
+# If this property is unset, system will save the data in the default relative path directory under
+# the UDF folder(i.e., %CONFIGNODE_HOME%/ext/temporary).
+#
+# If it is absolute, system will save the data in exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the
+# UDF folder.
+# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative
+# path.
+#
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# temporary_lib_dir=ext\\temporary
+#
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# temporary_lib_dir=ext/temporary
 
 
 ####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 96d2fe5f1e..17d93b8f85 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -25,10 +25,12 @@ import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
 import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
 import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 
 import org.apache.thrift.TException;
@@ -233,6 +235,20 @@ public class AsyncDataNodeClientPool {
     clientManager.clear(endPoint);
   }
 
+  /**
+   * Only used in UDFManager
+   *
+   * @param endPoint The specific DataNode
+   */
+  public void createFunction(
+      TEndPoint endPoint, TCreateFunctionRequest request, CreateFunctionHandler handler) {
+    try {
+      clientManager.borrowClient(endPoint).createFunction(request, handler);
+    } catch (Exception e) {
+      LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
+    }
+  }
+
   // TODO: Is the ClientPool must be a singleton?
   private static class ClientPoolHolder {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
new file mode 100644
index 0000000000..e740a5c6a9
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> {
+
+  private final CountDownLatch countDownLatch;
+  private final List<TSStatus> dataNodeResponseStatus;
+  private final String ip;
+  private final int port;
+
+  public CreateFunctionHandler(
+      CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) {
+    this.countDownLatch = countDownLatch;
+    this.dataNodeResponseStatus = dataNodeResponseStatus;
+    this.ip = ip;
+    this.port = port;
+  }
+
+  @Override
+  public void onComplete(TSStatus response) {
+    countDownLatch.countDown();
+    dataNodeResponseStatus.add(response);
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    countDownLatch.countDown();
+    dataNodeResponseStatus.add(
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("[" + ip + ":" + port + "] " + exception.getMessage()));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 5362f9f95b..0ca14d6dd5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -103,9 +103,16 @@ public class ConfigNodeConf {
   private String consensusDir =
       ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
 
-  /** UDF directory, storage udf metadata and jars */
-  private String udfDir =
-      ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.UDF_FOLDER;
+  /** External lib directory, stores user-uploaded JAR files */
+  private String extLibDir = IoTDBConstant.EXT_FOLDER_NAME;
+
+  /** External lib directory for UDF, stores user-uploaded JAR files */
+  private String udfLibDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_FOLDER_NAME;
+
+  /** External temporary lib directory for storing downloaded JAR files */
+  private String temporaryLibDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
 
   /** Time partition interval in seconds */
   private long timePartitionInterval = 604800;
@@ -149,6 +156,9 @@ public class ConfigNodeConf {
   private void formulateFolders() {
     systemDir = addHomeDir(systemDir);
     consensusDir = addHomeDir(consensusDir);
+    extLibDir = addHomeDir(extLibDir);
+    udfLibDir = addHomeDir(udfLibDir);
+    temporaryLibDir = addHomeDir(temporaryLibDir);
   }
 
   private String addHomeDir(String dir) {
@@ -336,12 +346,24 @@ public class ConfigNodeConf {
     this.systemDir = systemDir;
   }
 
-  public String getUdfDir() {
-    return udfDir;
+  public String getSystemUdfDir() {
+    return getSystemDir() + File.separator + "udf" + File.separator;
+  }
+
+  public String getUdfLibDir() {
+    return udfLibDir;
+  }
+
+  public void setUdfLibDir(String udfLibDir) {
+    this.udfLibDir = udfLibDir;
+  }
+
+  public String getTemporaryLibDir() {
+    return temporaryLibDir;
   }
 
-  public void setUdfDir(String udfDir) {
-    this.udfDir = udfDir;
+  public void setTemporaryLibDir(String temporaryLibDir) {
+    this.temporaryLibDir = temporaryLibDir;
   }
 
   public int getSchemaReplicationFactor() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index b5c58efd92..2a024dd3b0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -175,7 +175,10 @@ public class ConfigNodeDescriptor {
 
       conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
 
-      conf.setUdfDir(properties.getProperty("udf_dir", conf.getUdfDir()));
+      conf.setUdfLibDir(properties.getProperty("udf_lib_dir", conf.getUdfLibDir()));
+
+      conf.setTemporaryLibDir(
+          properties.getProperty("temporary_lib_dir", conf.getTemporaryLibDir()));
 
       conf.setTimePartitionInterval(
           Long.parseLong(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 50aa61adea..fd7d01ab92 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -187,6 +188,9 @@ public abstract class ConfigRequest implements IConsensusRequest {
         case ApplyConfigNode:
           req = new ApplyConfigNodeReq();
           break;
+        case CreateFunction:
+          req = new CreateFunctionReq();
+          break;
         case GetChildPathsPartition:
           req = new GetChildPathsPartitionReq();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index a4b1606371..11fbc14ff4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -59,6 +59,7 @@ public enum ConfigRequestType {
   ListUserRoles,
   ListRoleUsers,
   ApplyConfigNode,
+  CreateFunction,
   GetChildPathsPartition,
   GetChildNodesPartition;
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
new file mode 100644
index 0000000000..4cc034a335
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateFunctionReq extends ConfigRequest {
+
+  private String functionName;
+  private String className;
+  private List<String> uris;
+
+  public CreateFunctionReq() {
+    super(ConfigRequestType.CreateFunction);
+  }
+
+  public CreateFunctionReq(String functionName, String className, List<String> uris) {
+    super(ConfigRequestType.CreateFunction);
+    this.functionName = functionName;
+    this.className = className;
+    this.uris = uris;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public List<String> getUris() {
+    return uris;
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(getType().ordinal());
+
+    ReadWriteIOUtils.write(functionName, buffer);
+    ReadWriteIOUtils.write(className, buffer);
+
+    final int size = uris.size();
+    ReadWriteIOUtils.write(size, buffer);
+    for (String uri : uris) {
+      ReadWriteIOUtils.write(uri, buffer);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    functionName = ReadWriteIOUtils.readString(buffer);
+    className = ReadWriteIOUtils.readString(buffer);
+
+    final int size = ReadWriteIOUtils.readInt(buffer);
+    uris = new ArrayList<>(size);
+    for (int i = 0; i < size; ++i) {
+      uris.add(ReadWriteIOUtils.readString(buffer));
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index fcdd767d39..83775e0fd8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -104,6 +105,9 @@ public class ConfigManager implements Manager {
   /** Manage procedure */
   private final ProcedureManager procedureManager;
 
+  /** UDF */
+  private final UDFManager udfManager;
+
   public ConfigManager() throws IOException {
     // Build the persistence module
     NodeInfo nodeInfo = new NodeInfo();
@@ -111,11 +115,12 @@ public class ConfigManager implements Manager {
     PartitionInfo partitionInfo = new PartitionInfo();
     AuthorInfo authorInfo = new AuthorInfo();
     ProcedureInfo procedureInfo = new ProcedureInfo();
+    UDFInfo udfInfo = new UDFInfo();
 
     // Build state machine and executor
     ConfigRequestExecutor executor =
         new ConfigRequestExecutor(
-            nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo);
+            nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo, udfInfo);
     PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor);
 
     // Build the manager module
@@ -124,6 +129,7 @@ public class ConfigManager implements Manager {
     this.partitionManager = new PartitionManager(this, partitionInfo);
     this.permissionManager = new PermissionManager(this, authorInfo);
     this.procedureManager = new ProcedureManager(this, procedureInfo);
+    this.udfManager = new UDFManager(this, udfInfo);
     this.loadManager = new LoadManager(this);
     this.consensusManager = new ConsensusManager(stateMachine);
 
@@ -599,6 +605,19 @@ public class ConfigManager implements Manager {
     return nodeManager.applyConfigNode(applyConfigNodeReq);
   }
 
+  @Override
+  public TSStatus createFunction(String udfName, String className, List<String> uris) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? udfManager.createFunction(udfName, className, uris)
+        : status;
+  }
+
+  @Override
+  public UDFManager getUDFManager() {
+    return udfManager;
+  }
+
   public ProcedureManager getProcedureManager() {
     return procedureManager;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 16a07d050a..47892d1fbc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -90,6 +90,13 @@ public interface Manager {
    */
   LoadManager getLoadManager();
 
+  /**
+   * Get UDFManager
+   *
+   * @return UDFManager instance
+   */
+  UDFManager getUDFManager();
+
   /**
    * Register DataNode
    *
@@ -218,4 +225,6 @@ public interface Manager {
    * @return status
    */
   TSStatus applyConfigNode(ApplyConfigNodeReq applyConfigNodeReq);
+
+  TSStatus createFunction(String udfName, String className, List<String> uris);
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
new file mode 100644
index 0000000000..a3f7b03034
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+
+public class UDFManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDFManager.class);
+
+  private final ConfigManager configManager;
+  private final UDFInfo udfInfo;
+
+  public UDFManager(ConfigManager configManager, UDFInfo udfInfo) {
+    this.configManager = configManager;
+    this.udfInfo = udfInfo;
+  }
+
+  public TSStatus createFunction(String functionName, String className, List<String> uris) {
+    try {
+      udfInfo.validateBeforeRegistration(functionName, className, uris);
+
+      final TSStatus configNodeStatus =
+          configManager
+              .getConsensusManager()
+              .write(new CreateFunctionReq(functionName, className, uris))
+              .getStatus();
+      if (configNodeStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return configNodeStatus;
+      }
+
+      return squashDataNodeResponseStatusList(
+          createFunctionOnDataNodes(functionName, className, uris));
+    } catch (Exception e) {
+      final String errorMessage =
+          String.format(
+              "Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
+              functionName, className, uris, e);
+      LOGGER.warn(errorMessage);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(errorMessage);
+    }
+  }
+
+  private List<TSStatus> createFunctionOnDataNodes(
+      String functionName, String className, List<String> uris) {
+    final List<TDataNodeInfo> onlineDataNodes =
+        configManager.getNodeManager().getOnlineDataNodes(-1);
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
+    final CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size());
+    final TCreateFunctionRequest request =
+        new TCreateFunctionRequest(functionName, className, uris);
+
+    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+      final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
+      AsyncDataNodeClientPool.getInstance()
+          .createFunction(
+              endPoint,
+              request,
+              new CreateFunctionHandler(
+                  countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+    }
+
+    try {
+      countDownLatch.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.error("UDFManager was interrupted during creating functions on data nodes", e);
+    }
+
+    return dataNodeResponseStatus;
+  }
+
+  private TSStatus squashDataNodeResponseStatusList(List<TSStatus> dataNodeResponseStatusList) {
+    final List<TSStatus> failedStatus =
+        dataNodeResponseStatusList.stream()
+            .filter(status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
+            .collect(Collectors.toList());
+    return failedStatus.isEmpty()
+        ? new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+        : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage(failedStatus.toString());
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
new file mode 100644
index 0000000000..f0aa594d44
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.udf.service.UDFClassLoader;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableResource;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class UDFInfo implements SnapshotProcessor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDFInfo.class);
+
+  private static final ConfigNodeConf CONFIG_NODE_CONF =
+      ConfigNodeDescriptor.getInstance().getConf();
+
+  private final UDFExecutableManager udfExecutableManager;
+  private final UDFRegistrationService udfRegistrationService;
+
+  public UDFInfo() {
+    udfExecutableManager = UDFExecutableManager.getInstance();
+    udfRegistrationService = UDFRegistrationService.getInstance();
+  }
+
+  public synchronized void validateBeforeRegistration(
+      String functionName, String className, List<String> uris) throws Exception {
+    udfRegistrationService.validate(functionName, className);
+
+    if (uris.isEmpty()) {
+      fetchExecutablesAndCheckInstantiation(className);
+    } else {
+      fetchExecutablesAndCheckInstantiation(className, uris);
+    }
+  }
+
+  private void fetchExecutablesAndCheckInstantiation(String className) throws Exception {
+    try (UDFClassLoader temporaryUdfClassLoader =
+        new UDFClassLoader(CONFIG_NODE_CONF.getUdfLibDir())) {
+      Class.forName(className, true, temporaryUdfClassLoader)
+          .getDeclaredConstructor()
+          .newInstance();
+    }
+  }
+
+  private void fetchExecutablesAndCheckInstantiation(String className, List<String> uris)
+      throws Exception {
+    final UDFExecutableResource resource = udfExecutableManager.request(uris);
+    try (UDFClassLoader temporaryUdfClassLoader = new UDFClassLoader(resource.getResourceDir())) {
+      Class.forName(className, true, temporaryUdfClassLoader)
+          .getDeclaredConstructor()
+          .newInstance();
+    } finally {
+      udfExecutableManager.removeFromTemporaryLibRoot(resource);
+    }
+  }
+
+  public synchronized TSStatus createFunction(CreateFunctionReq req) {
+    final String functionName = req.getFunctionName();
+    final String className = req.getClassName();
+    final List<String> uris = req.getUris();
+
+    try {
+      udfRegistrationService.register(functionName, className, uris, udfExecutableManager, true);
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (Exception e) {
+      final String errorMessage =
+          String.format(
+              "Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
+              functionName, className, uris, e);
+      LOGGER.warn(errorMessage, e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(errorMessage);
+    }
+  }
+
+  @Override
+  public synchronized boolean processTakeSnapshot(File snapshotDir) throws IOException {
+    return udfExecutableManager.processTakeSnapshot(snapshotDir)
+        && udfRegistrationService.processTakeSnapshot(snapshotDir);
+  }
+
+  @Override
+  public synchronized void processLoadSnapshot(File snapshotDir) throws IOException {
+    udfExecutableManager.processLoadSnapshot(snapshotDir);
+    udfRegistrationService.processLoadSnapshot(snapshotDir);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index dfad8c10c4..1a008ffbcf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -54,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -83,17 +85,21 @@ public class ConfigRequestExecutor {
 
   private final ProcedureInfo procedureInfo;
 
+  private final UDFInfo udfInfo;
+
   public ConfigRequestExecutor(
       NodeInfo nodeInfo,
       ClusterSchemaInfo clusterSchemaInfo,
       PartitionInfo partitionInfo,
       AuthorInfo authorInfo,
-      ProcedureInfo procedureInfo) {
+      ProcedureInfo procedureInfo,
+      UDFInfo udfInfo) {
     this.nodeInfo = nodeInfo;
     this.clusterSchemaInfo = clusterSchemaInfo;
     this.partitionInfo = partitionInfo;
     this.authorInfo = authorInfo;
     this.procedureInfo = procedureInfo;
+    this.udfInfo = udfInfo;
   }
 
   public DataSet executorQueryPlan(ConfigRequest req)
@@ -181,6 +187,8 @@ public class ConfigRequestExecutor {
         return authorInfo.authorNonQuery((AuthorReq) req);
       case ApplyConfigNode:
         return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
+      case CreateFunction:
+        return udfInfo.createFunction((CreateFunctionReq) req);
       default:
         throw new UnknownPhysicalPlanTypeException(req.getType());
     }
@@ -290,6 +298,7 @@ public class ConfigRequestExecutor {
     allAttributes.add(clusterSchemaInfo);
     allAttributes.add(partitionInfo);
     allAttributes.add(nodeInfo);
+    allAttributes.add(udfInfo);
     return allAttributes;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 8a431f2522..c4641bc4d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.confignode.service;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
 import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
@@ -75,11 +80,24 @@ public class ConfigNode implements ConfigNodeMBean {
     registerManager.register(new JMXService());
     JMXService.registerMBean(this, mbeanName);
 
+    registerUdfServices();
+
     configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
     registerManager.register(configNodeRPCService);
     LOGGER.info("Init rpc server success");
   }
 
+  private void registerUdfServices() throws StartupException {
+    final ConfigNodeConf configNodeConf = ConfigNodeDescriptor.getInstance().getConf();
+    registerManager.register(
+        UDFExecutableManager.setupAndGetInstance(
+            configNodeConf.getTemporaryLibDir(), configNodeConf.getUdfLibDir()));
+    registerManager.register(
+        UDFClassLoaderManager.setupAndGetInstance(configNodeConf.getUdfLibDir()));
+    registerManager.register(
+        UDFRegistrationService.setupAndGetInstance(configNodeConf.getSystemUdfDir()));
+  }
+
   public void active() {
     try {
       setUp();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 6cc00fb56f..cbb0e54424 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -388,6 +389,11 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
     return status;
   }
 
+  @Override
+  public TSStatus createFunction(TCreateFunctionReq req) {
+    return configManager.createFunction(req.getUdfName(), req.getClassName(), req.getUris());
+  }
+
   public void handleClientExit() {}
 
   // TODO: Interfaces for data operations
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 7a481933c1..10e335f910 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -175,6 +175,7 @@ public class IoTDBConstant {
   public static final String TRACING_LOG = "tracing.txt";
   public static final String EXT_FOLDER_NAME = "ext";
   public static final String UDF_FOLDER_NAME = "udf";
+  public static final String TMP_FOLDER_NAME = "temporary";
   public static final String TRIGGER_FOLDER_NAME = "trigger";
   public static final String MQTT_FOLDER_NAME = "mqtt";
   public static final String WAL_FOLDER_NAME = "wal";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 1e913818cc..02ca71b26c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -44,6 +44,7 @@ public enum ServiceType {
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
   UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
   UDF_REGISTRATION_SERVICE("UDF Registration Service", ""),
+  UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""),
   TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
   TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""),
   CACHE_HIT_RATIO_DISPLAY_SERVICE(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
new file mode 100644
index 0000000000..9c30095196
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+public class SnapshotUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
+
+  public static boolean takeSnapshotForDir(String source, String snapshotDestination)
+      throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(source);
+    final File destinationFile = systemFileFactory.getFile(snapshotDestination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+    if (!sourceFile.exists()) {
+      return true;
+    }
+
+    FileUtils.deleteQuietly(temporaryFile);
+    FileUtils.forceMkdir(temporaryFile);
+
+    try {
+      FileUtils.copyDirectory(sourceFile, temporaryFile);
+      FileUtils.deleteQuietly(destinationFile);
+      return temporaryFile.renameTo(destinationFile);
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
+  }
+
+  public static void loadSnapshotForDir(String snapshotSource, String destination)
+      throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(snapshotSource);
+    final File destinationFile = systemFileFactory.getFile(destination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+    if (!sourceFile.exists()) {
+      return;
+    }
+
+    try {
+      if (destinationFile.exists()) {
+        FileUtils.deleteQuietly(temporaryFile);
+        FileUtils.moveDirectory(destinationFile, temporaryFile);
+      }
+
+      FileUtils.forceMkdir(destinationFile);
+
+      try {
+        FileUtils.copyDirectory(sourceFile, destinationFile);
+      } catch (Exception e) {
+        LOGGER.error("Failed to load udf snapshot and rollback.");
+        FileUtils.deleteQuietly(destinationFile);
+
+        if (temporaryFile.exists()) {
+          FileUtils.moveDirectory(temporaryFile, destinationFile);
+        }
+      }
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
+  }
+
+  private SnapshotUtils() {}
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index 02fb489021..8d92f4ccff 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -82,6 +82,10 @@ public class UDFClassLoaderManager implements IService {
     return activeClassLoader;
   }
 
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // IService
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
   @Override
   public void start() throws StartupException {
     try {
@@ -110,6 +114,10 @@ public class UDFClassLoaderManager implements IService {
     return ServiceType.UDF_CLASSLOADER_MANAGER_SERVICE;
   }
 
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // singleton instance holder
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
   private static UDFClassLoaderManager INSTANCE = null;
 
   public static synchronized UDFClassLoaderManager setupAndGetInstance(String libRoot) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
new file mode 100644
index 0000000000..f7f18d7171
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class UDFExecutableManager implements IService, SnapshotProcessor {
+
+  private final String temporaryLibRoot;
+  private final String udfLibRoot;
+
+  private final AtomicLong requestCounter;
+
+  private UDFExecutableManager(String temporaryLibRoot, String udfLibRoot) {
+    this.temporaryLibRoot = temporaryLibRoot;
+    this.udfLibRoot = udfLibRoot;
+
+    requestCounter = new AtomicLong(0);
+  }
+
+  public UDFExecutableResource request(List<String> uris) throws URISyntaxException, IOException {
+    final long requestId = generateNextRequestId();
+    downloadExecutables(uris, requestId);
+    return new UDFExecutableResource(requestId, getDirStringByRequestId(requestId));
+  }
+
+  public void moveToExtLibDir(UDFExecutableResource resource, String functionName)
+      throws IOException {
+    FileUtils.moveDirectory(
+        getDirByRequestId(resource.getRequestId()), getDirByFunctionName(functionName));
+  }
+
+  public void removeFromTemporaryLibRoot(UDFExecutableResource resource) {
+    removeFromTemporaryLibRoot(resource.getRequestId());
+  }
+
+  public void removeFromExtLibDir(String functionName) {
+    FileUtils.deleteQuietly(getDirByFunctionName(functionName));
+  }
+
+  private synchronized long generateNextRequestId() throws IOException {
+    long requestId = requestCounter.getAndIncrement();
+    while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+      requestId = requestCounter.getAndIncrement();
+    }
+    FileUtils.forceMkdir(getDirByRequestId(requestId));
+    return requestId;
+  }
+
+  private void downloadExecutables(List<String> uris, long requestId)
+      throws IOException, URISyntaxException {
+    // TODO: para download
+    try {
+      for (String uriString : uris) {
+        final URL url = new URI(uriString).toURL();
+        final String fileName = uriString.substring(uriString.lastIndexOf("/") + 1);
+        final String destination =
+            temporaryLibRoot + File.separator + requestId + File.separator + fileName;
+        FileUtils.copyURLToFile(url, FSFactoryProducer.getFSFactory().getFile(destination));
+      }
+    } catch (Exception e) {
+      removeFromTemporaryLibRoot(requestId);
+      throw e;
+    }
+  }
+
+  private void removeFromTemporaryLibRoot(long requestId) {
+    FileUtils.deleteQuietly(getDirByRequestId(requestId));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // dir string and dir file generation
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public File getDirByRequestId(long requestId) {
+    return FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+  }
+
+  public String getDirStringByRequestId(long requestId) {
+    return temporaryLibRoot + File.separator + requestId + File.separator;
+  }
+
+  public File getDirByFunctionName(String functionName) {
+    return FSFactoryProducer.getFSFactory().getFile(getDirStringByFunctionName(functionName));
+  }
+
+  public String getDirStringByFunctionName(String functionName) {
+    return udfLibRoot + File.separator + functionName + File.separator;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // IService
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public void start() throws StartupException {
+    try {
+      makeDirIfNecessary(temporaryLibRoot);
+      makeDirIfNecessary(udfLibRoot);
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    // nothing to do
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.UDF_EXECUTABLE_MANAGER_SERVICE;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // singleton instance holder
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private static UDFExecutableManager INSTANCE = null;
+
+  public static synchronized UDFExecutableManager setupAndGetInstance(
+      String temporaryLibRoot, String udfLibRoot) {
+    if (INSTANCE == null) {
+      INSTANCE = new UDFExecutableManager(temporaryLibRoot, udfLibRoot);
+    }
+    return INSTANCE;
+  }
+
+  private static void makeDirIfNecessary(String dir) throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(dir);
+    if (file.exists() && file.isDirectory()) {
+      return;
+    }
+    FileUtils.forceMkdir(file);
+  }
+
+  public static UDFExecutableManager getInstance() {
+    return INSTANCE;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // SnapshotProcessor
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+    return SnapshotUtils.takeSnapshotForDir(
+            temporaryLibRoot,
+            snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary")
+        && SnapshotUtils.takeSnapshotForDir(
+            udfLibRoot,
+            snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf");
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+    SnapshotUtils.loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary",
+        temporaryLibRoot);
+    SnapshotUtils.loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf",
+        udfLibRoot);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
new file mode 100644
index 0000000000..ec0c375997
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+public class UDFExecutableResource {
+
+  private final long requestId;
+  private final String resourceDir;
+
+  public UDFExecutableResource(long requestId, String resourceDir) {
+    this.requestId = requestId;
+    this.resourceDir = resourceDir;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public String getResourceDir() {
+    return resourceDir;
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
index 51068cf99b..54e8bde4a0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
@@ -25,14 +25,18 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.List;
 
 public class UDFLogWriter {
 
-  public static final Byte REGISTER_TYPE = 0;
+  public static final Byte REGISTER_WITHOUT_URIS_TYPE = 0;
   public static final Byte DEREGISTER_TYPE = 1;
+  public static final Byte REGISTER_WITH_URIS_TYPE = 2;
 
-  private static final String REGISTER_TYPE_STRING = REGISTER_TYPE.toString();
+  private static final String REGISTER_WITHOUT_URIS_TYPE_STRING =
+      REGISTER_WITHOUT_URIS_TYPE.toString();
   private static final String DEREGISTER_TYPE_STRING = DEREGISTER_TYPE.toString();
+  private static final String REGISTER_WITH_URIS_TYPE_STRING = REGISTER_WITH_URIS_TYPE.toString();
 
   private final File logFile;
   private final BufferedWriter writer;
@@ -53,8 +57,17 @@ public class UDFLogWriter {
     }
   }
 
-  public void register(String functionName, String className) throws IOException {
-    writer.write(String.format("%s,%s,%s", REGISTER_TYPE_STRING, functionName, className));
+  public void register(String functionName, String className, List<String> uris)
+      throws IOException {
+    if (uris.isEmpty()) {
+      writer.write(
+          String.format("%s,%s,%s", REGISTER_WITHOUT_URIS_TYPE_STRING, functionName, className));
+    } else {
+      writer.write(
+          String.format(
+              "%s,%s,%s,%s",
+              REGISTER_WITH_URIS_TYPE_STRING, functionName, className, String.join(",", uris)));
+    }
     writeLineAndFlush();
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
index a4997ead0b..feb799f2db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
@@ -22,19 +22,26 @@ package org.apache.iotdb.commons.udf.service;
 import org.apache.iotdb.commons.udf.api.UDTF;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 
 public class UDFRegistrationInformation {
 
   private final String functionName;
   private final String className;
+  private final List<String> uris;
   private final boolean isBuiltin;
 
   private Class<?> functionClass;
 
   public UDFRegistrationInformation(
-      String functionName, String className, boolean isBuiltin, Class<?> functionClass) {
+      String functionName,
+      String className,
+      List<String> uris,
+      boolean isBuiltin,
+      Class<?> functionClass) {
     this.functionName = functionName;
     this.className = className;
+    this.uris = uris;
     this.isBuiltin = isBuiltin;
     this.functionClass = functionClass;
   }
@@ -47,6 +54,10 @@ public class UDFRegistrationInformation {
     return className;
   }
 
+  public List<String> getUris() {
+    return uris;
+  }
+
   public boolean isBuiltin() {
     return isBuiltin;
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index 3f9be546d1..46a37eb356 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.udf.api.UDF;
 import org.apache.iotdb.commons.udf.api.exception.UDFRegistrationException;
 import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
@@ -38,14 +39,17 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class UDFRegistrationService implements IService {
+public class UDFRegistrationService implements IService, SnapshotProcessor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UDFRegistrationService.class);
 
@@ -54,7 +58,7 @@ public class UDFRegistrationService implements IService {
   private final String temporaryLogFileName;
 
   private final ReentrantLock registrationLock;
-  private final ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
+  private ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
 
   private final ReentrantReadWriteLock logWriterLock;
   private UDFLogWriter logWriter;
@@ -77,13 +81,36 @@ public class UDFRegistrationService implements IService {
     registrationLock.unlock();
   }
 
+  /** invoked by config leader for validation before registration */
+  public void validate(String functionName, String className) {
+    functionName = functionName.toUpperCase();
+    validateFunctionName(functionName, className);
+    checkIfRegistered(functionName, className);
+  }
+
   public void register(String functionName, String className, boolean writeToTemporaryLogFile)
       throws UDFRegistrationException {
     functionName = functionName.toUpperCase();
     validateFunctionName(functionName, className);
     checkIfRegistered(functionName, className);
-    doRegister(functionName, className);
-    tryAppendRegistrationLog(functionName, className, writeToTemporaryLogFile);
+    doRegister(functionName, className, Collections.emptyList());
+    tryAppendRegistrationLog(
+        functionName, className, Collections.emptyList(), writeToTemporaryLogFile);
+  }
+
+  public void register(
+      String functionName,
+      String className,
+      List<String> uris,
+      UDFExecutableManager udfExecutableManager,
+      boolean writeToTemporaryLogFile)
+      throws UDFRegistrationException {
+    functionName = functionName.toUpperCase();
+    validateFunctionName(functionName, className);
+    checkIfRegistered(functionName, className);
+    downloadExecutableResources(functionName, className, uris, udfExecutableManager);
+    doRegister(functionName, className, uris);
+    tryAppendRegistrationLog(functionName, className, uris, writeToTemporaryLogFile);
   }
 
   private static void validateFunctionName(String functionName, String className)
@@ -132,7 +159,38 @@ public class UDFRegistrationService implements IService {
     throw new UDFRegistrationException(errorMessage);
   }
 
-  private void doRegister(String functionName, String className) throws UDFRegistrationException {
+  private void downloadExecutableResources(
+      String functionName,
+      String className,
+      List<String> uris,
+      UDFExecutableManager udfExecutableManager)
+      throws UDFRegistrationException {
+    if (uris.isEmpty()) {
+      return;
+    }
+
+    try {
+      final UDFExecutableResource resource = udfExecutableManager.request(uris);
+      try {
+        udfExecutableManager.removeFromExtLibDir(functionName);
+        udfExecutableManager.moveToExtLibDir(resource, functionName);
+      } catch (Exception innerException) {
+        udfExecutableManager.removeFromExtLibDir(functionName);
+        udfExecutableManager.removeFromTemporaryLibRoot(resource);
+        throw innerException;
+      }
+    } catch (Exception outerException) {
+      String errorMessage =
+          String.format(
+              "Failed to register UDF %s(%s) because failed to fetch UDF executables(%s)",
+              functionName, className, uris);
+      LOGGER.warn(errorMessage);
+      throw new UDFRegistrationException(errorMessage, outerException);
+    }
+  }
+
+  private void doRegister(String functionName, String className, List<String> uris)
+      throws UDFRegistrationException {
     acquireRegistrationLock();
     try {
       UDFClassLoader currentActiveClassLoader =
@@ -143,7 +201,7 @@ public class UDFRegistrationService implements IService {
       functionClass.getDeclaredConstructor().newInstance();
       registrationInformation.put(
           functionName,
-          new UDFRegistrationInformation(functionName, className, false, functionClass));
+          new UDFRegistrationInformation(functionName, className, uris, false, functionClass));
     } catch (IOException
         | InstantiationException
         | InvocationTargetException
@@ -162,14 +220,14 @@ public class UDFRegistrationService implements IService {
   }
 
   private void tryAppendRegistrationLog(
-      String functionName, String className, boolean writeToTemporaryLogFile)
+      String functionName, String className, List<String> uris, boolean writeToTemporaryLogFile)
       throws UDFRegistrationException {
     if (!writeToTemporaryLogFile) {
       return;
     }
 
     try {
-      appendRegistrationLog(functionName, className);
+      appendRegistrationLog(functionName, className, uris);
     } catch (IOException e) {
       registrationInformation.remove(functionName);
       String errorMessage =
@@ -218,10 +276,11 @@ public class UDFRegistrationService implements IService {
     }
   }
 
-  private void appendRegistrationLog(String functionName, String className) throws IOException {
+  private void appendRegistrationLog(String functionName, String className, List<String> uris)
+      throws IOException {
     logWriterLock.writeLock().lock();
     try {
-      logWriter.register(functionName, className);
+      logWriter.register(functionName, className, uris);
     } finally {
       logWriterLock.writeLock().unlock();
     }
@@ -275,15 +334,20 @@ public class UDFRegistrationService implements IService {
   @Override
   public void start() throws StartupException {
     try {
-      registerBuiltinTimeSeriesGeneratingFunctions();
-      makeDirIfNecessary();
-      doRecovery();
-      logWriter = new UDFLogWriter(logFileName);
+      recovery();
     } catch (Exception e) {
       throw new StartupException(e);
     }
   }
 
+  private void recovery() throws Exception {
+    registrationInformation = new ConcurrentHashMap<>();
+    registerBuiltinTimeSeriesGeneratingFunctions();
+    makeDirIfNecessary();
+    doRecovery();
+    logWriter = new UDFLogWriter(logFileName);
+  }
+
   private void registerBuiltinTimeSeriesGeneratingFunctions() {
     for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction :
         BuiltinTimeSeriesGeneratingFunction.values()) {
@@ -293,6 +357,7 @@ public class UDFRegistrationService implements IService {
           new UDFRegistrationInformation(
               functionName,
               builtinTimeSeriesGeneratingFunction.getClassName(),
+              Collections.emptyList(),
               true,
               builtinTimeSeriesGeneratingFunction.getFunctionClass()));
     }
@@ -331,10 +396,13 @@ public class UDFRegistrationService implements IService {
       while ((line = reader.readLine()) != null) {
         String[] data = line.split(",");
         byte type = Byte.parseByte(data[0]);
-        if (type == UDFLogWriter.REGISTER_TYPE) {
+        if (type == UDFLogWriter.REGISTER_WITHOUT_URIS_TYPE
+            || type == UDFLogWriter.REGISTER_WITH_URIS_TYPE) {
           recoveredUDFs.put(data[1], data[2]);
         } else if (type == UDFLogWriter.DEREGISTER_TYPE) {
           recoveredUDFs.remove(data[1]);
+        } else {
+          throw new UnsupportedEncodingException();
         }
       }
     }
@@ -370,7 +438,8 @@ public class UDFRegistrationService implements IService {
       if (information.isBuiltin()) {
         continue;
       }
-      temporaryLogFile.register(information.getFunctionName(), information.getClassName());
+      temporaryLogFile.register(
+          information.getFunctionName(), information.getClassName(), information.getUris());
     }
     temporaryLogFile.close();
   }
@@ -391,7 +460,9 @@ public class UDFRegistrationService implements IService {
     Class<?> functionClass = Class.forName(className, true, classLoader);
     functionName = functionName.toUpperCase();
     registrationInformation.put(
-        functionName, new UDFRegistrationInformation(functionName, className, true, functionClass));
+        functionName,
+        new UDFRegistrationInformation(
+            functionName, className, Collections.emptyList(), true, functionClass));
   }
 
   @TestOnly
@@ -416,4 +487,26 @@ public class UDFRegistrationService implements IService {
   public static UDFRegistrationService getInstance() {
     return INSTANCE;
   }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // SnapshotProcessor
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+    return SnapshotUtils.takeSnapshotForDir(
+        ulogFileDir, snapshotDir.getAbsolutePath() + File.separator + "udf");
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+    SnapshotUtils.loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "udf", ulogFileDir);
+
+    try {
+      recovery();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ebd6034542..2e0ce66ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -588,6 +589,22 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus createFunction(TCreateFunctionReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.createFunction(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   public static class Factory extends BaseClientFactory<PartitionRegionId, ConfigNodeClient> {
 
     public Factory(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5b03bbfaa0..adf1e404a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -252,6 +252,10 @@ public class IoTDBConfig {
   private String udfDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_FOLDER_NAME;
 
+  /** External temporary lib directory for storing downloaded JAR files */
+  private String temporaryLibDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
   /** External lib directory for trigger, stores user-uploaded JAR files */
   private String triggerDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TRIGGER_FOLDER_NAME;
@@ -997,6 +1001,7 @@ public class IoTDBConfig {
     indexRootFolder = addHomeDir(indexRootFolder);
     extDir = addHomeDir(extDir);
     udfDir = addHomeDir(udfDir);
+    temporaryLibDir = addHomeDir(temporaryLibDir);
     triggerDir = addHomeDir(triggerDir);
     mqttDir = addHomeDir(mqttDir);
     for (int i = 0; i < walDirs.length; i++) {
@@ -1191,6 +1196,10 @@ public class IoTDBConfig {
     return udfDir;
   }
 
+  public String getTemporaryLibDir() {
+    return temporaryLibDir;
+  }
+
   public void setUdfDir(String udfDir) {
     this.udfDir = udfDir;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 671edfb68b..ebd9327e71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
@@ -51,6 +52,7 @@ public class ConfigTaskVisitor
     return new SetStorageGroupTask(statement);
   }
 
+  @Override
   public IConfigTask visitDeleteStorageGroup(
       DeleteStorageGroupStatement statement, TaskContext context) {
     return new DeleteStorageGroupTask(statement);
@@ -88,5 +90,11 @@ public class ConfigTaskVisitor
     return new AuthorizerConfigTask(statement);
   }
 
+  @Override
+  public IConfigTask visitCreateFunction(
+      CreateFunctionStatement createFunctionStatement, TaskContext context) {
+    return new CreateFunctionTask(createFunctionStatement);
+  }
+
   public static class TaskContext {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java
new file mode 100644
index 0000000000..56b95836bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CreateFunctionTask implements IConfigTask {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CreateFunctionTask.class);
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  private final String udfName;
+  private final String className;
+  private final List<String> uris;
+
+  public CreateFunctionTask(CreateFunctionStatement createFunctionStatement) {
+    udfName = createFunctionStatement.getUdfName();
+    className = createFunctionStatement.getClassName();
+    uris =
+        createFunctionStatement.getUris().stream().map(URI::toString).collect(Collectors.toList());
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(
+      IClientManager<PartitionRegionId, ConfigNodeClient> clientManager)
+      throws InterruptedException {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    if (CONFIG.isClusterMode()) {
+      executeCluster(clientManager, future);
+    } else {
+      executeStandalone(future);
+    }
+    return future;
+  }
+
+  private void executeCluster(
+      IClientManager<PartitionRegionId, ConfigNodeClient> clientManager,
+      SettableFuture<ConfigTaskResult> future) {
+    try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      final TSStatus executionStatus =
+          client.createFunction(new TCreateFunctionReq(udfName, className, uris));
+
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.error(
+            "[{}] Failed to create function {}({}) in config node, URI: {}.",
+            executionStatus,
+            udfName,
+            className,
+            uris);
+        future.setException(new StatementExecutionException(executionStatus));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (TException | IOException e) {
+      LOGGER.error("Failed to connect to config node.");
+      future.setException(e);
+    }
+  }
+
+  private void executeStandalone(SettableFuture<ConfigTaskResult> future) {
+    try {
+      UDFRegistrationService.getInstance()
+          .register(udfName, className, uris, UDFExecutableManager.getInstance(), true);
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } catch (Exception e) {
+      final String message =
+          String.format(
+              "Failed to create function %s(%s), URI: %s, because %s.",
+              udfName, className, uris, e.getMessage());
+      LOGGER.error(message, e);
+      future.setException(
+          new StatementExecutionException(
+              new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+                  .setMessage(message)));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 8426e8bbfe..f01b7b1a61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -99,7 +100,9 @@ import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountDevicesContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountNodesContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountStorageGroupContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountTimeseriesContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CreateFunctionContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ExpressionContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.UriContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParserBaseVisitor;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -112,6 +115,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -595,6 +600,30 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return new CountStorageGroupStatement(path);
   }
 
+  // Create Function
+  @Override
+  public Statement visitCreateFunction(CreateFunctionContext ctx) {
+    return new CreateFunctionStatement(
+        parseIdentifier(ctx.udfName.getText()),
+        parseStringLiteral(ctx.className.getText()),
+        parseUris(ctx.uri()));
+  }
+
+  private List<URI> parseUris(List<UriContext> uriContexts) {
+    List<URI> uris = new ArrayList<>();
+    if (uriContexts != null) {
+      for (UriContext uriContext : uriContexts) {
+        final String uriString = uriContext.getText();
+        try {
+          uris.add(new URI(parseStringLiteral(uriString)));
+        } catch (URISyntaxException e) {
+          throw new SemanticException(String.format("'%s' is not a legal URI.", uriString));
+        }
+      }
+    }
+    return uris;
+  }
+
   // Show Child Paths =====================================================================
   @Override
   public Statement visitShowChildPaths(IoTDBSqlParser.ShowChildPathsContext ctx) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index bdbd99f766..650d15d160 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
@@ -126,6 +127,11 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showTTLStatement, context);
   }
 
+  // UDF
+  public R visitCreateFunction(CreateFunctionStatement createFunctionStatement, C context) {
+    return visitStatement(createFunctionStatement, context);
+  }
+
   /** Data Manipulation Language (DML) */
 
   // Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java
new file mode 100644
index 0000000000..990c9b7fb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+public class CreateFunctionStatement extends Statement implements IConfigStatement {
+
+  private final String udfName;
+  private final String className;
+  private final List<URI> uris;
+
+  public CreateFunctionStatement(String udfName, String className, List<URI> uris) {
+    super();
+    statementType = StatementType.CREATE_FUNCTION;
+    this.udfName = udfName;
+    this.className = className;
+    this.uris = uris;
+  }
+
+  public String getUdfName() {
+    return udfName;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public List<URI> getUris() {
+    return uris;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateFunction(this, context);
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 05ad28efcf..1fd483360d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -277,16 +278,8 @@ public class DataNode implements DataNodeMBean {
         .getConfig()
         .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
 
-    registerManager.register(TemporaryQueryDataFileService.getInstance());
-    registerManager.register(
-        UDFClassLoaderManager.setupAndGetInstance(
-            IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
-    registerManager.register(
-        UDFRegistrationService.setupAndGetInstance(
-            IoTDBDescriptor.getInstance().getConfig().getSystemDir()
-                + File.separator
-                + "udf"
-                + File.separator));
+    registerUdfServices();
+
     registerManager.register(ReceiverService.getInstance());
     registerManager.register(MetricsService.getInstance());
 
@@ -328,6 +321,23 @@ public class DataNode implements DataNodeMBean {
     logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
   }
 
+  private void registerUdfServices() throws StartupException {
+    registerManager.register(TemporaryQueryDataFileService.getInstance());
+    registerManager.register(
+        UDFExecutableManager.setupAndGetInstance(
+            IoTDBDescriptor.getInstance().getConfig().getTemporaryLibDir(),
+            IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
+    registerManager.register(
+        UDFClassLoaderManager.setupAndGetInstance(
+            IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
+    registerManager.register(
+        UDFRegistrationService.setupAndGetInstance(
+            IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+                + File.separator
+                + "udf"
+                + File.separator));
+  }
+
   private void initConfigManager() {
     long time = System.currentTimeMillis();
     IoTDB.configManager.init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 4c7f08edf0..fb6db492dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -72,6 +74,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
@@ -354,5 +357,22 @@ public class InternalServiceImpl implements InternalService.Iface {
     return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
   }
 
+  @Override
+  public TSStatus createFunction(TCreateFunctionRequest request) {
+    try {
+      UDFRegistrationService.getInstance()
+          .register(
+              request.getUdfName(),
+              request.getClassName(),
+              request.getUris(),
+              UDFExecutableManager.getInstance(),
+              true);
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (Exception e) {
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+  }
+
   public void handleClientExit() {}
 }
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f66e743b96..ab90f5bb33 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -211,6 +211,13 @@ struct TConfigNodeRegisterResp {
   3: optional list<common.TConfigNodeLocation> configNodeList
 }
 
+// UDF
+struct TCreateFunctionReq {
+  1: required string udfName
+  2: required string className
+  3: required list<string> uris
+}
+
 service ConfigIService {
 
   /* DataNode */
@@ -270,4 +277,8 @@ service ConfigIService {
   TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
 
   common.TSStatus applyConfigNode(common.TConfigNodeLocation configNodeLocation)
+
+  /* UDF */
+
+  common.TSStatus createFunction(TCreateFunctionReq req)
 }
\ No newline at end of file
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index d55520a9f4..eaecf45dc7 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -135,7 +135,13 @@ struct TSchemaFetchResponse {
   1: required binary serializedSchemaTree
 }
 
-struct TInvalidatePermissionCacheReq{
+struct TCreateFunctionRequest {
+  1: required string udfName
+  2: required string className
+  3: required list<string> uris
+}
+
+struct TInvalidatePermissionCacheReq {
   1: required string username
   2: required string roleName
 }
@@ -216,6 +222,13 @@ service InternalService {
   **/
   common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
 
+  /**
+   * Config node will create a function on a list of data nodes.
+   *
+   * @param function name, function class name, and executable uris
+   **/
+  common.TSStatus createFunction(TCreateFunctionRequest req)
+
   /**
    * Config node will invalidate permission Info cache.
    *