You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/09/27 01:27:31 UTC

[iotdb] branch master updated: [IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 85329a92a8 [IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)
85329a92a8 is described below

commit 85329a92a89ac8c05b68fc9cb40fd96b7001635b
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Sep 27 09:27:25 2022 +0800

    [IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)
---
 .../commons/executable/ExecutableManager.java      | 107 ++++++++++---
 .../apache/iotdb/commons/service/ServiceType.java  |   3 -
 .../iotdb/commons/trigger/TriggerInformation.java  |  17 +-
 .../apache/iotdb/commons/trigger/TriggerTable.java |  25 ++-
 .../trigger/service/TriggerExecutableManager.java  |  34 +---
 .../commons/udf/service/UDFExecutableManager.java  |   6 +
 .../udf/service/UDFRegistrationService.java        |   6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../engine/trigger/executor/TriggerExecutor.java   |   2 +-
 .../trigger/service/TriggerClassLoaderManager.java |   2 +-
 .../service/TriggerRegistrationService.java        |   2 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  84 +++++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  17 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  15 +-
 .../db}/trigger/service/TriggerClassLoader.java    |   2 +-
 .../trigger/service/TriggerClassLoaderManager.java |  55 +++----
 .../trigger/service/TriggerManagementService.java  | 176 +++++++++++----------
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../src/main/thrift/confignode.thrift              |   4 +-
 thrift/src/main/thrift/datanode.thrift             |   8 +-
 .../iotdb/trigger/api/enums/FailureStrategy.java   |  37 +++--
 21 files changed, 376 insertions(+), 230 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4be42361ab..b069c6365a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -59,27 +60,39 @@ public class ExecutableManager {
   public ExecutableResource request(List<String> uris) throws URISyntaxException, IOException {
     final long requestId = generateNextRequestId();
     downloadExecutables(uris, requestId);
-    return new ExecutableResource(requestId, getDirStringByRequestId(requestId));
+    return new ExecutableResource(requestId, getDirStringUnderTempRootByRequestId(requestId));
   }
 
-  public void moveToExtLibDir(ExecutableResource resource, String name) throws IOException {
-    FileUtils.moveDirectory(getDirByRequestId(resource.getRequestId()), getDirByName(name));
+  public void moveTempDirToExtLibDir(ExecutableResource resource, String name) throws IOException {
+    FileUtils.moveDirectory(
+        getDirUnderTempRootByRequestId(resource.getRequestId()), getDirUnderLibRootByName(name));
   }
 
-  public void removeFromTemporaryLibRoot(ExecutableResource resource) {
-    removeFromTemporaryLibRoot(resource.getRequestId());
+  public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource, String name)
+      throws IOException {
+    FileUtils.moveFileToDirectory(
+        getFileByFullPath(
+            getDirStringUnderTempRootByRequestId(resource.getRequestId()) + File.separator + name),
+        getFileByFullPath(libRoot),
+        false);
+  }
+
+  public void copyFileToExtLibDir(String filePath) throws IOException {
+    FileUtils.copyFileToDirectory(
+        FSFactoryProducer.getFSFactory().getFile(filePath),
+        FSFactoryProducer.getFSFactory().getFile(this.libRoot));
   }
 
-  public void removeFromExtLibDir(String functionName) {
-    FileUtils.deleteQuietly(getDirByName(functionName));
+  public void removeFromTemporaryLibRoot(ExecutableResource resource) {
+    removeFromTemporaryLibRoot(resource.getRequestId());
   }
 
   private synchronized long generateNextRequestId() throws IOException {
     long requestId = requestCounter.getAndIncrement();
-    while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+    while (FileUtils.isDirectory(getDirUnderTempRootByRequestId(requestId))) {
       requestId = requestCounter.getAndIncrement();
     }
-    FileUtils.forceMkdir(getDirByRequestId(requestId));
+    FileUtils.forceMkdir(getDirUnderTempRootByRequestId(requestId));
     return requestId;
   }
 
@@ -101,29 +114,42 @@ public class ExecutableManager {
   }
 
   private void removeFromTemporaryLibRoot(long requestId) {
-    FileUtils.deleteQuietly(getDirByRequestId(requestId));
+    FileUtils.deleteQuietly(getDirUnderTempRootByRequestId(requestId));
   }
 
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // dir string and dir file generation
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public File getDirByRequestId(long requestId) {
-    return FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+  public File getDirUnderTempRootByRequestId(long requestId) {
+    return FSFactoryProducer.getFSFactory()
+        .getFile(getDirStringUnderTempRootByRequestId(requestId));
   }
 
-  public String getDirStringByRequestId(long requestId) {
+  public String getDirStringUnderTempRootByRequestId(long requestId) {
     return temporaryLibRoot + File.separator + requestId + File.separator;
   }
 
-  public File getDirByName(String name) {
-    return FSFactoryProducer.getFSFactory().getFile(getDirStringByName(name));
+  public File getDirUnderLibRootByName(String name) {
+    return FSFactoryProducer.getFSFactory().getFile(getDirStringUnderLibRootByName(name));
   }
 
-  public String getDirStringByName(String name) {
+  public String getDirStringUnderLibRootByName(String name) {
     return libRoot + File.separator + name + File.separator;
   }
 
+  public File getFileUnderLibRootByName(String name) {
+    return FSFactoryProducer.getFSFactory().getFile(getFileStringUnderLibRootByName(name));
+  }
+
+  public String getFileStringUnderLibRootByName(String name) {
+    return libRoot + File.separator + name;
+  }
+
+  private File getFileByFullPath(String path) {
+    return FSFactoryProducer.getFSFactory().getFile(path);
+  }
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // transfer jar file to bytebuffer for thrift
   /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -138,12 +164,11 @@ public class ExecutableManager {
       }
       ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
       fileChannel.read(byteBuffer);
+      byteBuffer.flip();
       return byteBuffer;
     } catch (Exception e) {
       LOGGER.warn(
-          "Error occurred during transferring file{} to ByteBuffer, the cause is {}",
-          filePath,
-          e.getMessage());
+          "Error occurred during transferring file{} to ByteBuffer, the cause is {}", filePath, e);
       throw e;
     }
   }
@@ -156,14 +181,48 @@ public class ExecutableManager {
     String destination = this.libRoot + File.separator + fileName;
     Path path = Paths.get(destination);
     Files.deleteIfExists(path);
-    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE)) {
-      fileChannel.write(byteBuffer);
+    Files.createFile(path);
+    try (FileOutputStream outputStream = new FileOutputStream(destination)) {
+      outputStream.getChannel().write(byteBuffer);
     } catch (IOException e) {
       LOGGER.warn(
-          "Error occurred during writing bytebuffer to {} , the cause is {}",
-          destination,
-          e.getMessage());
+          "Error occurred during writing bytebuffer to {} , the cause is {}", destination, e);
       throw e;
     }
   }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // other functions
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * @param fileName given file name
+   * @return true if file exists under LibRoot
+   */
+  public boolean hasFileUnderLibRoot(String fileName) {
+    return Files.exists(Paths.get(this.libRoot + File.separator + fileName));
+  }
+
+  public boolean hasFileUnderTemporaryRoot(String fileName) {
+    return Files.exists(Paths.get(this.temporaryLibRoot + File.separator + fileName));
+  }
+
+  public void saveTextAsFileUnderTemporaryRoot(String text, String fileName) throws IOException {
+    Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+    Files.deleteIfExists(path);
+    Files.write(path, text.getBytes());
+  }
+
+  public String readTextFromFileUnderTemporaryRoot(String fileName) throws IOException {
+    Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+    return new String(Files.readAllBytes(path));
+  }
+
+  public String getTemporaryLibRoot() {
+    return temporaryLibRoot;
+  }
+
+  public String getLibRoot() {
+    return libRoot;
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 239ef700be..670fddf37f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -44,9 +44,6 @@ public enum ServiceType {
   UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
   UDF_REGISTRATION_SERVICE("UDF Registration Service", ""),
   UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""),
-  TRIGGER_CLASSLOADER_MANAGER_SERVICE("Trigger ClassLoader Manager Service", ""),
-  TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""),
-  TRIGGER_EXECUTABLE_MANAGER_SERVICE("Trigger Executable Manager Service", ""),
   TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
   TRIGGER_REGISTRATION_SERVICE_OLD("Old Standalone Trigger Registration Service", ""),
   CACHE_HIT_RATIO_DISPLAY_SERVICE(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index f85faf4bdb..0d3649ff6c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -49,6 +49,9 @@ public class TriggerInformation {
   /** only used for Stateful Trigger */
   private TDataNodeLocation dataNodeLocation;
 
+  /** MD5 of the Jar File */
+  private String jarFileMD5;
+
   public TriggerInformation() {};
 
   public TriggerInformation(
@@ -60,7 +63,8 @@ public class TriggerInformation {
       TriggerEvent event,
       TTriggerState triggerState,
       boolean isStateful,
-      TDataNodeLocation dataNodeLocation) {
+      TDataNodeLocation dataNodeLocation,
+      String jarFileMD5) {
     this.pathPattern = pathPattern;
     this.triggerName = triggerName;
     this.className = className;
@@ -70,6 +74,7 @@ public class TriggerInformation {
     this.triggerState = triggerState;
     this.isStateful = isStateful;
     this.dataNodeLocation = dataNodeLocation;
+    this.jarFileMD5 = jarFileMD5;
   }
 
   public ByteBuffer serialize() throws IOException {
@@ -91,6 +96,7 @@ public class TriggerInformation {
     if (isStateful) {
       ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, outputStream);
     }
+    ReadWriteIOUtils.write(jarFileMD5, outputStream);
   }
 
   public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
@@ -109,6 +115,7 @@ public class TriggerInformation {
       triggerInformation.dataNodeLocation =
           ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
     }
+    triggerInformation.jarFileMD5 = ReadWriteIOUtils.readString(byteBuffer);
     return triggerInformation;
   }
 
@@ -179,4 +186,12 @@ public class TriggerInformation {
   public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
     this.dataNodeLocation = dataNodeLocation;
   }
+
+  public String getJarFileMD5() {
+    return jarFileMD5;
+  }
+
+  public void setJarFileMD5(String jarFileMD5) {
+    this.jarFileMD5 = jarFileMD5;
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index effd6ca1be..cee7bb6b4f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -30,10 +30,6 @@ import java.util.Map;
 public class TriggerTable {
   private final Map<String, TriggerInformation> triggerTable;
 
-  // todo: Maintain a PatternTree: PathPattern -> List<String> triggerNames
-  // Given a PathPattern, return the triggerNames of triggers whose PathPatterns match the given
-  // one.
-
   public TriggerTable() {
     triggerTable = new HashMap<>();
   }
@@ -45,6 +41,15 @@ public class TriggerTable {
   public void addTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
     triggerTable.put(triggerName, triggerInformation);
   }
+
+  public TriggerInformation getTriggerInformation(String triggerName) {
+    return triggerTable.get(triggerName);
+  }
+
+  public void setTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
+    triggerTable.put(triggerName, triggerInformation);
+  }
+
   // for dropTrigger
   public void deleteTriggerInformation(String triggerName) {
     triggerTable.remove(triggerName);
@@ -54,16 +59,8 @@ public class TriggerTable {
     return triggerTable.containsKey(triggerName);
   }
 
-  public void activeTrigger(String triggerName) {
-    triggerTable.get(triggerName).setTriggerState(TTriggerState.ACTIVE);
-  }
-
-  public TriggerInformation getTriggerInformation(String triggerName) {
-    return triggerTable.get(triggerName);
-  }
-
-  public void setTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
-    triggerTable.put(triggerName, triggerInformation);
+  public void setTriggerState(String triggerName, TTriggerState triggerState) {
+    triggerTable.get(triggerName).setTriggerState(triggerState);
   }
 
   // for showTrigger
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
index 25f925b80c..2996ee55a5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
@@ -19,40 +19,16 @@
 
 package org.apache.iotdb.commons.trigger.service;
 
-import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.executable.ExecutableManager;
 import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
 
-public class TriggerExecutableManager extends ExecutableManager implements IService {
+import java.io.IOException;
+
+public class TriggerExecutableManager extends ExecutableManager {
   private TriggerExecutableManager(String temporaryLibRoot, String triggerLibRoot) {
     super(temporaryLibRoot, triggerLibRoot);
   }
 
-  /////////////////////////////////////////////////////////////////////////////////////////////////
-  // IService
-  /////////////////////////////////////////////////////////////////////////////////////////////////
-  @Override
-  public void start() throws StartupException {
-    try {
-      SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
-      SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-
-  @Override
-  public void stop() {
-    // nothing to do
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.TRIGGER_EXECUTABLE_MANAGER_SERVICE;
-  }
-
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // singleton instance holder
   /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -60,8 +36,10 @@ public class TriggerExecutableManager extends ExecutableManager implements IServ
   private static TriggerExecutableManager INSTANCE = null;
 
   public static synchronized TriggerExecutableManager setupAndGetInstance(
-      String temporaryLibRoot, String triggerLibRoot) {
+      String temporaryLibRoot, String triggerLibRoot) throws IOException {
     if (INSTANCE == null) {
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(triggerLibRoot);
       INSTANCE = new TriggerExecutableManager(temporaryLibRoot, triggerLibRoot);
     }
     return INSTANCE;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index a1cf6f34c8..6ea6f840c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 
+import org.apache.commons.io.FileUtils;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -35,6 +37,10 @@ public class UDFExecutableManager extends ExecutableManager implements IService,
     super(temporaryLibRoot, udfLibRoot);
   }
 
+  public void removeUDFJarFromExtLibDir(String functionName) {
+    FileUtils.deleteQuietly(getDirUnderLibRootByName(functionName));
+  }
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // IService
   /////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index c0bf6a5d3e..14bb0b510e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -173,10 +173,10 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
     try {
       final ExecutableResource resource = udfExecutableManager.request(uris);
       try {
-        udfExecutableManager.removeFromExtLibDir(functionName);
-        udfExecutableManager.moveToExtLibDir(resource, functionName);
+        udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
+        udfExecutableManager.moveTempDirToExtLibDir(resource, functionName);
       } catch (Exception innerException) {
-        udfExecutableManager.removeFromExtLibDir(functionName);
+        udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
         udfExecutableManager.removeFromTemporaryLibRoot(resource);
         throw innerException;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 50d69dc5aa..8f7787c7c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -270,7 +270,7 @@ public class IoTDBConfig {
 
   /** External temporary lib directory for storing downloaded trigger JAR files */
   private String triggerTemporaryLibDir =
-      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_TMP_FOLDER_NAME;
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TRIGGER_TMP_FOLDER_NAME;
 
   /** External lib directory for ext Pipe plugins, stores user-defined JAR files */
   private String extPipeDir =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
index b782ad5777..b9a4695f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.engine.trigger.executor;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.engine.trigger.api.Trigger;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationInformation;
 import org.apache.iotdb.db.exception.TriggerExecutionException;
 import org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.trigger.api.TriggerAttributes;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
index 400d2adcbb..dd22241f35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.engine.trigger.service;
 
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
index f555c11376..91caddc384 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -44,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 670f6dfc0e..808dbf79d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.executable.ExecutableResource;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.trigger.TriggerTable;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -95,11 +97,15 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
@@ -108,10 +114,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.SocketTimeoutException;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -292,12 +304,80 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
               createTriggerStatement.getTriggerEvent().getId(),
               createTriggerStatement.getTriggerType().getId(),
               createTriggerStatement.getPathPattern().serialize(),
-              createTriggerStatement.getAttributes());
+              createTriggerStatement.getAttributes(),
+              FailureStrategy.OPTIMISTIC.getId()); // set default strategy
 
-      if (!createTriggerStatement.isUsingURI()) {
+      String libRoot = TriggerExecutableManager.getInstance().getLibRoot();
+      if (createTriggerStatement.isUsingURI()) {
+        try {
+          // download executable
+          ExecutableResource resource =
+              TriggerExecutableManager.getInstance()
+                  .request(Collections.singletonList(createTriggerStatement.getJarPath()));
+          String uriString = createTriggerStatement.getJarPath();
+          String jarFileName = uriString.substring(uriString.lastIndexOf("/") + 1);
+          // move to ext
+          TriggerExecutableManager.getInstance()
+              .moveFileUnderTempRootToExtLibDir(resource, jarFileName);
+          tCreateTriggerReq.setJarPath(jarFileName);
+          // jarFilePath after moving to ext lib
+          String jarFilePathUnderLib =
+              TriggerExecutableManager.getInstance().getFileStringUnderLibRootByName(jarFileName);
+          tCreateTriggerReq.setJarFile(ExecutableManager.transferToBytebuffer(jarFilePathUnderLib));
+          tCreateTriggerReq.setJarMD5(
+              DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderLib))));
+
+        } catch (IOException | URISyntaxException e) {
+          LOGGER.warn(
+              "Failed to download executable for trigger({}) using URI: {}, the cause is: {}",
+              createTriggerStatement.getTriggerName(),
+              createTriggerStatement.getJarPath(),
+              e);
+          future.setException(
+              new IoTDBException(
+                  "Failed to download executable for trigger '"
+                      + createTriggerStatement.getTriggerName()
+                      + "'",
+                  TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+          return future;
+        }
+      } else {
+        // change libRoot
+        libRoot = createTriggerStatement.getJarPath();
+        // set jarPath to file name instead of the full path
+        tCreateTriggerReq.setJarPath(new File(createTriggerStatement.getJarPath()).getName());
         // If jarPath is a file path, we transfer it to ByteBuffer and send it to ConfigNode.
         tCreateTriggerReq.setJarFile(
             ExecutableManager.transferToBytebuffer(createTriggerStatement.getJarPath()));
+        // set md5 of the jar file
+        tCreateTriggerReq.setJarMD5(
+            DigestUtils.md5Hex(
+                Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
+      }
+
+      // try to create instance, this request will fail if creation is not successful
+      try (TriggerClassLoader classLoader = new TriggerClassLoader(libRoot)) {
+        Class<?> triggerClass =
+            Class.forName(createTriggerStatement.getClassName(), true, classLoader);
+        Trigger trigger = (Trigger) triggerClass.getDeclaredConstructor().newInstance();
+        tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId());
+      } catch (ClassNotFoundException
+          | NoSuchMethodException
+          | InstantiationException
+          | IllegalAccessException
+          | InvocationTargetException e) {
+        LOGGER.warn(
+            "Failed to create trigger when try to create trigger({}) instance first, the cause is: {}",
+            createTriggerStatement.getTriggerName(),
+            e);
+        future.setException(
+            new IoTDBException(
+                "Failed to load class '"
+                    + createTriggerStatement.getClassName()
+                    + "', because it's not found in jar file: "
+                    + createTriggerStatement.getJarPath(),
+                TSStatusCode.TRIGGER_LOAD_CLASS.getStatusCode()));
+        return future;
       }
 
       final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 55f92de243..e25b510c6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.StartupChecks;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -65,7 +64,6 @@ import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
 import org.apache.iotdb.db.sync.SyncService;
-import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -291,7 +289,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(DriverScheduler.getInstance());
 
     registerUdfServices();
-    registerTriggerServices();
+    initTriggerRelatedInstance();
 
     logger.info(
         "IoTDB DataNode is setting up, some storage groups may not be ready now, please wait several seconds...");
@@ -378,12 +376,13 @@ public class DataNode implements DataNodeMBean {
             config.getSystemDir() + File.separator + "udf" + File.separator));
   }
 
-  private void registerTriggerServices() throws StartupException {
-    registerManager.register(
-        TriggerExecutableManager.setupAndGetInstance(
-            config.getTriggerTemporaryLibDir(), config.getTriggerDir()));
-    registerManager.register(TriggerClassLoaderManager.setupAndGetInstance(config.getTriggerDir()));
-    registerManager.register(TriggerManagementService.setupAndGetInstance());
+  private void initTriggerRelatedInstance() throws StartupException {
+    try {
+      TriggerExecutableManager.setupAndGetInstance(
+          config.getTriggerTemporaryLibDir(), config.getTriggerDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
   }
 
   private void initSchemaEngine() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 4d434f5a3a..f28d85768e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.type.Gauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -96,6 +97,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
@@ -126,8 +128,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -925,7 +925,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws TException {
+  public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws TException {
     TriggerInformation triggerInformation = TriggerInformation.deserialize(req.triggerInformation);
     try {
       // set state to INACTIVE when creating trigger instance
@@ -940,7 +940,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       LOGGER.warn(
           "Error occurred when creating trigger instance for trigger: {}. The cause is {}.",
           triggerInformation.getTriggerName(),
-          e.getMessage());
+          e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
           .setMessage(e.getMessage());
     }
@@ -948,11 +948,14 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req) throws TException {
+  public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) throws TException {
     try {
       TriggerManagementService.getInstance().activeTrigger(req.triggerName);
     } catch (Exception e) {
-      LOGGER.error("Error occurred during ");
+      LOGGER.error(
+          "Error occurred during active trigger instance for trigger: {}. The cause is {}.",
+          req.triggerName,
+          e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
           .setMessage(e.getMessage());
     }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
similarity index 97%
rename from node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
rename to server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
index 9356c23f73..b344ccb338 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.trigger.service;
+package org.apache.iotdb.db.trigger.service;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
similarity index 63%
rename from node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
rename to server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
index f94d343ef9..e82c82e558 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
@@ -17,21 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.trigger.service;
+package org.apache.iotdb.db.trigger.service;
 
-import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class TriggerClassLoaderManager implements IService {
+public class TriggerClassLoaderManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerClassLoaderManager.class);
 
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
   /** The dir that stores jar files. */
   private final String libRoot;
 
@@ -42,10 +43,10 @@ public class TriggerClassLoaderManager implements IService {
    */
   private volatile TriggerClassLoader activeClassLoader;
 
-  private TriggerClassLoaderManager(String libRoot) {
+  private TriggerClassLoaderManager(String libRoot) throws IOException {
     this.libRoot = libRoot;
     LOGGER.info("Trigger lib root: {}", libRoot);
-    activeClassLoader = null;
+    activeClassLoader = new TriggerClassLoader(libRoot);
   }
 
   /** Call this method to get up-to-date ClassLoader before registering triggers */
@@ -60,44 +61,24 @@ public class TriggerClassLoaderManager implements IService {
     return activeClassLoader;
   }
 
-  /////////////////////////////////////////////////////////////////////////////////////////////////
-  // IService
-  /////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public void start() throws StartupException {
-    try {
-      SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
-      activeClassLoader = new TriggerClassLoader(libRoot);
-    } catch (IOException e) {
-      throw new StartupException(this.getID().getName(), e.getMessage());
-    }
-  }
-
-  @Override
-  public void stop() {
-    // nothing to do
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.TRIGGER_CLASSLOADER_MANAGER_SERVICE;
-  }
-
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // singleton instance holder
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private static TriggerClassLoaderManager INSTANCE = null;
+  private static class TriggerClassLoaderManagerHolder {
+    private static final TriggerClassLoaderManager INSTANCE;
 
-  public static synchronized TriggerClassLoaderManager setupAndGetInstance(String libRoot) {
-    if (INSTANCE == null) {
-      INSTANCE = new TriggerClassLoaderManager(libRoot);
+    static {
+      try {
+        SystemFileFactory.INSTANCE.makeDirIfNecessary(CONFIG.getTriggerDir());
+        INSTANCE = new TriggerClassLoaderManager(CONFIG.getTriggerDir());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
-    return INSTANCE;
   }
 
   public static TriggerClassLoaderManager getInstance() {
-    return INSTANCE;
+    return TriggerClassLoaderManagerHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index f3904fd71d..319b8bc309 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -19,30 +19,29 @@
 
 package org.apache.iotdb.db.trigger.service;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
-import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.trigger.api.Trigger;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class TriggerManagementService implements IService {
+public class TriggerManagementService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManagementService.class);
 
@@ -52,9 +51,7 @@ public class TriggerManagementService implements IService {
 
   private final Map<String, TriggerExecutor> executorMap;
 
-  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
-  private TDataNodeLocation tDataNodeLocationCache;
+  private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   private TriggerManagementService() {
     this.lock = new ReentrantLock();
@@ -62,55 +59,108 @@ public class TriggerManagementService implements IService {
     this.executorMap = new ConcurrentHashMap<>();
   }
 
-  public void acquireRegistrationLock() {
+  public void acquireLock() {
     lock.lock();
   }
 
-  public void releaseRegistrationLock() {
+  public void releaseLock() {
     lock.unlock();
   }
 
-  public void register(TriggerInformation triggerInformation) {
+  public void register(TriggerInformation triggerInformation) throws IOException {
     try {
-      acquireRegistrationLock();
+      acquireLock();
       checkIfRegistered(triggerInformation);
       doRegister(triggerInformation);
-    } catch (Exception e) {
-      LOGGER.warn(
-          "Failed to register trigger({}) on data node, the cause is: {}",
-          triggerInformation.getTriggerName(),
-          e.getMessage());
     } finally {
-      releaseRegistrationLock();
+      releaseLock();
     }
-  };
+  }
 
   public void activeTrigger(String triggerName) {
-    triggerTable.activeTrigger(triggerName);
-  };
+    try {
+      acquireLock();
+      triggerTable.setTriggerState(triggerName, TTriggerState.ACTIVE);
+    } finally {
+      releaseLock();
+    }
+  }
 
-  private void checkIfRegistered(TriggerInformation triggerInformation)
-      throws TriggerManagementException {
+  public void inactiveTrigger(String triggerName) {
+    try {
+      acquireLock();
+      triggerTable.setTriggerState(triggerName, TTriggerState.INACTIVE);
+    } finally {
+      releaseLock();
+    }
+  }
+
+  private void checkIfRegistered(TriggerInformation triggerInformation) throws IOException {
     String triggerName = triggerInformation.getTriggerName();
     if (triggerTable.containsTrigger(triggerName)) {
-      String errorMessage =
-          String.format(
-              "Failed to registered trigger %s, "
-                  + "because trigger %s has already been registered in TriggerTable",
-              triggerName, triggerName);
-      LOGGER.warn(errorMessage);
-      throw new TriggerManagementException(errorMessage);
+      String jarName = triggerInformation.getJarName();
+      if (TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName)) {
+        // A jar with the same name exists, we need to check md5
+        String existedMd5 = "";
+        String md5FilePath = triggerName + ".txt";
+
+        // if meet error when reading md5 from txt, we need to compute it again
+        boolean hasComputed = false;
+        if (TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) {
+          try {
+            existedMd5 =
+                TriggerExecutableManager.getInstance()
+                    .readTextFromFileUnderTemporaryRoot(md5FilePath);
+            hasComputed = true;
+          } catch (IOException e) {
+            LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath);
+          }
+        }
+        if (!hasComputed) {
+          try {
+            existedMd5 =
+                DigestUtils.md5Hex(
+                    Files.newInputStream(
+                        Paths.get(
+                            TriggerExecutableManager.getInstance().getLibRoot()
+                                + File.separator
+                                + triggerInformation.getJarName())));
+            // save the md5 in a txt under trigger temporary lib
+            TriggerExecutableManager.getInstance()
+                .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
+          } catch (IOException e) {
+            String errorMessage =
+                String.format(
+                    "Failed to registered trigger %s, "
+                        + "because error occurred when trying to compute md5 of jar file for trigger %s ",
+                    triggerName, triggerName);
+            LOGGER.warn(errorMessage);
+            throw e;
+          }
+        }
+
+        if (!existedMd5.equals(triggerInformation.getJarFileMD5())) {
+          // same jar name with different md5
+          String errorMessage =
+              String.format(
+                  "Failed to registered trigger %s, "
+                      + "because existed md5 of jar file for trigger %s is different from the new jar file. ",
+                  triggerName, triggerName);
+          LOGGER.warn(errorMessage);
+          throw new TriggerManagementException(errorMessage);
+        }
+      }
     }
   }
 
-  private void doRegister(TriggerInformation triggerInformation) {
+  private void doRegister(TriggerInformation triggerInformation) throws IOException {
     try (TriggerClassLoader currentActiveClassLoader =
         TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
       String triggerName = triggerInformation.getTriggerName();
       triggerTable.addTriggerInformation(triggerName, triggerInformation);
       // if it is a stateful trigger, we only create its instance on specified DataNode
       if (!triggerInformation.isStateful()
-          || triggerInformation.getDataNodeLocation().equals(getTDataNodeLocation())) {
+          || triggerInformation.getDataNodeLocation().getDataNodeId() == DATA_NODE_ID) {
         // get trigger instance
         Trigger trigger =
             constructTriggerInstance(triggerInformation.getClassName(), currentActiveClassLoader);
@@ -122,15 +172,13 @@ public class TriggerManagementService implements IService {
       String errorMessage =
           String.format(
               "Failed to register trigger %s with className: %s. The cause is: %s",
-              triggerInformation.getTriggerName(),
-              triggerInformation.getClassName(),
-              e.getMessage());
+              triggerInformation.getTriggerName(), triggerInformation.getClassName(), e);
       LOGGER.warn(errorMessage);
-      throw new TriggerManagementException(errorMessage);
+      throw e;
     }
   }
 
-  private Trigger constructTriggerInstance(String className, TriggerClassLoader classLoader)
+  public Trigger constructTriggerInstance(String className, TriggerClassLoader classLoader)
       throws TriggerManagementException {
     try {
       Class<?> triggerClass = Class.forName(className, true, classLoader);
@@ -146,52 +194,14 @@ public class TriggerManagementService implements IService {
     }
   }
 
-  private TDataNodeLocation getTDataNodeLocation() {
-    if (tDataNodeLocationCache == null) {
-      // Set DataNodeLocation
-      tDataNodeLocationCache = new TDataNodeLocation();
-      tDataNodeLocationCache.setDataNodeId(CONFIG.getDataNodeId());
-      tDataNodeLocationCache.setClientRpcEndPoint(
-          new TEndPoint(CONFIG.getRpcAddress(), CONFIG.getRpcPort()));
-      tDataNodeLocationCache.setInternalEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getInternalPort()));
-      tDataNodeLocationCache.setMPPDataExchangeEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getMppDataExchangePort()));
-      tDataNodeLocationCache.setDataRegionConsensusEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getDataRegionConsensusPort()));
-      tDataNodeLocationCache.setSchemaRegionConsensusEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getSchemaRegionConsensusPort()));
-    }
-    return tDataNodeLocationCache;
-  }
-
-  @Override
-  public void start() throws StartupException {}
-
-  @Override
-  public void stop() {
-    // nothing to do
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.TRIGGER_REGISTRATION_SERVICE;
-  }
-
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // singleton instance holder
   /////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static TriggerManagementService INSTANCE = null;
-
-  public static synchronized TriggerManagementService setupAndGetInstance() {
-    if (INSTANCE == null) {
-      INSTANCE = new TriggerManagementService();
-    }
-    return INSTANCE;
+  public static TriggerManagementService getInstance() {
+    return TriggerManagementServiceHolder.INSTANCE;
   }
 
-  public static TriggerManagementService getInstance() {
-    return INSTANCE;
+  private static class TriggerManagementServiceHolder {
+    private static final TriggerManagementService INSTANCE = new TriggerManagementService();
   }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6aaeb8c335..2b010489d5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -75,6 +75,8 @@ public enum TSStatusCode {
   VERIFY_METADATA_ERROR(343),
   MEASUREMENT_IN_BLACK_LIST(344),
   COLOSSAL_RECORD(349),
+  TRIGGER_LOAD_CLASS(360),
+  TRIGGER_DOWNLOAD_ERROR(361),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 2f89ba994e..b956c66bc7 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -264,7 +264,9 @@ struct TCreateTriggerReq {
   6: required byte triggerType
   7: required binary pathPattern,
   8: required map<string, string> attributes,
-  9: optional binary jarFile
+  9: optional binary jarFile,
+  10: optional string jarMD5,
+  11: required i32 failureStrategy
 }
 
 struct TDropTriggerReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 2fc8d346ef..12d3a41ddb 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -169,12 +169,12 @@ struct TDropFunctionRequest {
   1: required string udfName
 }
 
-struct TcreateTriggerInstanceReq {
+struct TCreateTriggerInstanceReq {
   1: required binary triggerInformation
   2: required binary jarFile
 }
 
-struct TactiveTriggerInstanceReq {
+struct TActiveTriggerInstanceReq {
   1: required string triggerName
 }
 
@@ -421,14 +421,14 @@ service IDataNodeRPCService {
    *
    * @param TriggerInformation, jar file.
    **/
-  common.TSStatus createTriggerInstance(TcreateTriggerInstanceReq req)
+  common.TSStatus createTriggerInstance(TCreateTriggerInstanceReq req)
 
   /**
    * Config node will active a trigger instance on data node.
    *
    * @param trigger name.
    **/
-  common.TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req)
+  common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
 
   /**
     * Config node will drop a trigger on all online config nodes and data nodes.
diff --git a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
index 5920f33996..6f3faf342e 100644
--- a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
+++ b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.trigger.api.enums;
 
 import org.apache.iotdb.trigger.api.Trigger;
@@ -34,16 +33,34 @@ public enum FailureStrategy {
    * not have any influence on the triggers that have not been fired. The failure of this Trigger
    * will be simply ignored.
    */
-  OPTIMISTIC,
+  OPTIMISTIC(0),
 
   /**
-   * If this strategy were adopted, the failure of {@link Trigger#fire(Tablet)} of one Tablet would
-   * throw an exception and end this insertion. If a PESSIMISTIC trigger whose TRIGGER_EVENT is
-   * {@link TriggerEvent#BEFORE_INSERT} fails to fire in an insertion, all the triggers that have
-   * not fired will not be fired in this insertion and this insertion will not be executed. if a
-   * PESSIMISTIC trigger whose TRIGGER_EVENT is {@link TriggerEvent#AFTER_INSERT} fails to fire in
-   * an insertion, all the triggers that have not fired will not be fired, and this insertion will
-   * be marked as failed even if the insertion itself executed successfully.
+   * If this strategy were adopted, the failure of {@link Trigger#fire(Tablet)} of one Tablet
+   * would @@ -45,5 +45,26 @@ public enum FailureStrategy { an insertion, all the triggers that have
+   * not fired will not be fired, and this insertion will be marked as failed even if the insertion
+   * itself executed successfully.
    */
-  PESSIMISTIC,
+  PESSIMISTIC(1);
+
+  private final int id;
+
+  FailureStrategy(int id) {
+    this.id = id;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public static FailureStrategy construct(int id) {
+    switch (id) {
+      case 0:
+        return FailureStrategy.OPTIMISTIC;
+      case 1:
+        return FailureStrategy.PESSIMISTIC;
+      default:
+        throw new UnsupportedOperationException("Unsupported FailureStrategy Type.");
+    }
+  }
 }