You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/19 08:51:29 UTC

[iotdb] branch master updated: [IOTDB-4627]Trigger transfer (#7643)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 127263dd3b [IOTDB-4627]Trigger transfer (#7643)
127263dd3b is described below

commit 127263dd3b20d45bc6342e915cc6c15f918e6938
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Wed Oct 19 16:51:23 2022 +0800

    [IOTDB-4627]Trigger transfer (#7643)
---
 .../confignode/client/DataNodeRequestType.java     |  1 +
 .../client/async/AsyncDataNodeClientPool.java      |  7 ++
 .../client/async/handlers/AsyncClientHandler.java  |  1 +
 .../consensus/request/ConfigPhysicalPlan.java      | 12 +++
 .../consensus/request/ConfigPhysicalPlanType.java  |  5 +-
 ...ePlan.java => GetTransferringTriggersPlan.java} |  8 +-
 .../request/read/GetTriggerTablePlan.java          | 22 ++++-
 .../write/trigger/UpdateTriggerLocationPlan.java   | 76 ++++++++++++++++++
 .../trigger/UpdateTriggersOnTransferNodesPlan.java | 75 +++++++++++++++++
 .../TransferringTriggersResp.java}                 | 29 +++----
 .../iotdb/confignode/manager/ConfigManager.java    | 38 ++++++++-
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../iotdb/confignode/manager/TriggerManager.java   | 93 +++++++++++++++++++++-
 .../iotdb/confignode/manager/node/NodeManager.java | 37 ++++++++-
 .../iotdb/confignode/persistence/TriggerInfo.java  | 50 ++++++++----
 .../persistence/executor/ConfigPlanExecutor.java   | 13 ++-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  3 -
 .../procedure/impl/CreateTriggerProcedure.java     |  5 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  5 ++
 .../request/ConfigPhysicalPlanSerDeTest.java       | 72 +++++++++++++++--
 .../apache/iotdb/commons/trigger/TriggerTable.java | 40 ++++++++++
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 16 ++++
 .../java/org/apache/iotdb/db/service/DataNode.java |  4 -
 .../impl/DataNodeInternalRPCServiceImpl.java       | 17 ++++
 .../trigger/service/TriggerManagementService.java  | 38 +++++++--
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../src/main/thrift/confignode.thrift              |  9 ++-
 thrift/src/main/thrift/datanode.thrift             | 12 +++
 28 files changed, 622 insertions(+), 70 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index e2e52f3613..5fa8b3b0fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -59,6 +59,7 @@ public enum DataNodeRequestType {
   DROP_TRIGGER_INSTANCE,
   ACTIVE_TRIGGER_INSTANCE,
   INACTIVE_TRIGGER_INSTANCE,
+  UPDATE_TRIGGER_LOCATION,
 
   /** Sync */
   PRE_CREATE_PIPE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 24865fddbe..f2736fe121 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -170,6 +171,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case UPDATE_TRIGGER_LOCATION:
+          client.updateTriggerLocation(
+              (TUpdateTriggerLocationReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         case MERGE:
         case FULL_MERGE:
           client.merge(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a10ef2306c..d23bf29633 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -179,6 +179,7 @@ public class AsyncClientHandler<Q, R> {
       case DROP_TRIGGER_INSTANCE:
       case ACTIVE_TRIGGER_INSTANCE:
       case INACTIVE_TRIGGER_INSTANCE:
+      case UPDATE_TRIGGER_LOCATION:
       case MERGE:
       case FULL_MERGE:
       case FLUSH:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 2afc6be446..be83d6f453 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
 import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -72,7 +73,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -303,6 +306,15 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case GetSeriesSlotList:
           req = new GetSeriesSlotListPlan();
           break;
+        case UpdateTriggersOnTransferNodes:
+          req = new UpdateTriggersOnTransferNodesPlan();
+          break;
+        case UpdateTriggerLocation:
+          req = new UpdateTriggerLocationPlan();
+          break;
+        case GetTransferringTriggers:
+          req = new GetTransferringTriggersPlan();
+          break;
         default:
           throw new IOException("unknown PhysicalPlan type: " + typeNum);
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index c155a2bcd4..e81f4ddcc6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -92,5 +92,8 @@ public enum ConfigPhysicalPlanType {
   GetTriggerJar,
   GetRegionId,
   GetSeriesSlotList,
-  GetTimeSlotList
+  GetTimeSlotList,
+  UpdateTriggersOnTransferNodes,
+  UpdateTriggerLocation,
+  GetTransferringTriggers
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
similarity index 83%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
index da1caa3ca7..694fe0438f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
@@ -26,15 +26,15 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class GetTriggerTablePlan extends ConfigPhysicalPlan {
+public class GetTransferringTriggersPlan extends ConfigPhysicalPlan {
 
-  public GetTriggerTablePlan() {
-    super(ConfigPhysicalPlanType.GetTriggerTable);
+  public GetTransferringTriggersPlan() {
+    super(ConfigPhysicalPlanType.GetTransferringTriggers);
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+    stream.writeInt(ConfigPhysicalPlanType.GetTransferringTriggers.ordinal());
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
index da1caa3ca7..35919cb901 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.request.read;
 
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -28,15 +29,34 @@ import java.nio.ByteBuffer;
 
 public class GetTriggerTablePlan extends ConfigPhysicalPlan {
 
+  boolean onlyStateful;
+
   public GetTriggerTablePlan() {
     super(ConfigPhysicalPlanType.GetTriggerTable);
   }
 
+  public GetTriggerTablePlan(boolean onlyStateful) {
+    this();
+    this.onlyStateful = onlyStateful;
+  }
+
+  public boolean isOnlyStateful() {
+    return onlyStateful;
+  }
+
+  public void setOnlyStateful(boolean onlyStateful) {
+    this.onlyStateful = onlyStateful;
+  }
+
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+
+    ReadWriteIOUtils.write(onlyStateful, stream);
   }
 
   @Override
-  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    this.onlyStateful = ReadWriteIOUtils.readBool(buffer);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
new file mode 100644
index 0000000000..da5d302c39
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class UpdateTriggerLocationPlan extends ConfigPhysicalPlan {
+
+  private String triggerName;
+  private TDataNodeLocation dataNodeLocation;
+
+  public UpdateTriggerLocationPlan() {
+    super(ConfigPhysicalPlanType.UpdateTriggerLocation);
+  }
+
+  public UpdateTriggerLocationPlan(String triggerName, TDataNodeLocation dataNodeLocation) {
+    super(ConfigPhysicalPlanType.UpdateTriggerLocation);
+    this.triggerName = triggerName;
+    this.dataNodeLocation = dataNodeLocation;
+  }
+
+  public String getTriggerName() {
+    return triggerName;
+  }
+
+  public void setTriggerName(String triggerName) {
+    this.triggerName = triggerName;
+  }
+
+  public TDataNodeLocation getDataNodeLocation() {
+    return dataNodeLocation;
+  }
+
+  public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
+    this.dataNodeLocation = dataNodeLocation;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(getType().ordinal());
+
+    ReadWriteIOUtils.write(triggerName, stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    this.triggerName = ReadWriteIOUtils.readString(buffer);
+    this.dataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
new file mode 100644
index 0000000000..aa8ad19f18
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UpdateTriggersOnTransferNodesPlan extends ConfigPhysicalPlan {
+
+  private List<TDataNodeLocation> dataNodeLocations;
+
+  public UpdateTriggersOnTransferNodesPlan() {
+    super(ConfigPhysicalPlanType.UpdateTriggersOnTransferNodes);
+  }
+
+  public UpdateTriggersOnTransferNodesPlan(List<TDataNodeLocation> dataNodeLocations) {
+    super(ConfigPhysicalPlanType.UpdateTriggersOnTransferNodes);
+    this.dataNodeLocations = dataNodeLocations;
+  }
+
+  public List<TDataNodeLocation> getDataNodeLocations() {
+    return dataNodeLocations;
+  }
+
+  public void setDataNodeLocations(List<TDataNodeLocation> dataNodeLocations) {
+    this.dataNodeLocations = dataNodeLocations;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(getType().ordinal());
+
+    ReadWriteIOUtils.write(dataNodeLocations.size(), stream);
+    for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+      ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(size);
+    while (size > 0) {
+      dataNodeLocations.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer));
+      size--;
+    }
+    this.dataNodeLocations = dataNodeLocations;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
similarity index 54%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
index da1caa3ca7..11d6671e44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
@@ -17,26 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.consensus.request.read;
+package org.apache.iotdb.confignode.consensus.response;
 
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.consensus.common.DataSet;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.List;
 
-public class GetTriggerTablePlan extends ConfigPhysicalPlan {
+public class TransferringTriggersResp implements DataSet {
 
-  public GetTriggerTablePlan() {
-    super(ConfigPhysicalPlanType.GetTriggerTable);
+  private List<String> transferringTriggers;
+
+  public TransferringTriggersResp() {}
+
+  public TransferringTriggersResp(List<String> transferringTriggers) {
+    this.transferringTriggers = transferringTriggers;
   }
 
-  @Override
-  protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+  public List<String> getTransferringTriggers() {
+    return transferringTriggers;
   }
 
-  @Override
-  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+  public void setTransferringTriggers(List<String> transferringTriggers) {
+    this.transferringTriggers = transferringTriggers;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6718f618b7..1165a52c83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -229,7 +230,8 @@ public class ConfigManager implements IManager {
       try {
         dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
         dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
-        dataSet.setTriggerInformation(triggerManager.getTriggerTable().getAllTriggerInformation());
+        dataSet.setTriggerInformation(
+            triggerManager.getTriggerTable(false).getAllTriggerInformation());
       } finally {
         triggerManager.getTriggerInfo().releaseTriggerTableLock();
       }
@@ -801,7 +803,15 @@ public class ConfigManager implements IManager {
   public TGetTriggerTableResp getTriggerTable() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? triggerManager.getTriggerTable()
+        ? triggerManager.getTriggerTable(false)
+        : new TGetTriggerTableResp(status, Collections.emptyList());
+  }
+
+  @Override
+  public TGetTriggerTableResp getStatefulTriggerTable() {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? triggerManager.getTriggerTable(true)
         : new TGetTriggerTableResp(status, Collections.emptyList());
   }
 
@@ -1190,8 +1200,30 @@ public class ConfigManager implements IManager {
   }
 
   public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
+    Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
+    nodeManager
+        .filterDataNodeThroughStatus(NodeStatus.Running)
+        .forEach(
+            dataNodeConfiguration ->
+                runningDataNodeLocationMap.put(
+                    dataNodeConfiguration.getLocation().getDataNodeId(),
+                    dataNodeConfiguration.getLocation()));
+    if (runningDataNodeLocationMap.isEmpty()) {
+      // no running DataNode, will not transfer and print log
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    }
+
+    newUnknownDataList.forEach(
+        dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
+
     LOGGER.info("start Transfer of {}", newUnknownDataList);
     // transfer trigger
-    return triggerManager.transferTrigger(newUnknownDataList);
+    TSStatus transferResult =
+        triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
+    if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
+    }
+
+    return transferResult;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index e8b076ccf9..b45989915a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -310,6 +310,9 @@ public interface IManager {
   /** Show trigger & DataNode start */
   TGetTriggerTableResp getTriggerTable();
 
+  /** DataNode refresh stateful trigger cache */
+  TGetTriggerTableResp getStatefulTriggerTable();
+
   /** Get Trigger jar */
   TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req);
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index acce3d6fc8..7080a6d56a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -24,10 +24,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
 import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
 import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
@@ -35,6 +43,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.TriggerEvent;
 import org.apache.iotdb.trigger.api.enums.TriggerType;
@@ -46,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public class TriggerManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
@@ -101,10 +112,13 @@ public class TriggerManager {
     return configManager.getProcedureManager().dropTrigger(req.getTriggerName());
   }
 
-  public TGetTriggerTableResp getTriggerTable() {
+  public TGetTriggerTableResp getTriggerTable(boolean onlyStateful) {
     try {
       return ((TriggerTableResp)
-              configManager.getConsensusManager().read(new GetTriggerTablePlan()).getDataset())
+              configManager
+                  .getConsensusManager()
+                  .read(new GetTriggerTablePlan(onlyStateful))
+                  .getDataset())
           .convertToThriftResponse();
     } catch (IOException e) {
       LOGGER.error("Fail to get TriggerTable", e);
@@ -132,8 +146,79 @@ public class TriggerManager {
     }
   }
 
-  public TSStatus transferTrigger(List<TDataNodeLocation> newUnknownDataList) {
-    // TODO implement
+  /**
+   * Step1: Mark Stateful Triggers on UnknownDataNodes as {@link TTriggerState#TRANSFERRING}.
+   *
+   * <p>Step2: Get all Transferring Triggers marked in Step1.
+   *
+   * <p>Step3: For each trigger get in Step2, find the DataNode with the lowest load, then transfer
+   * the Stateful Trigger to it and update this information on all DataNodes.
+   *
+   * <p>Step4: Update the newest location on ConfigNodes.
+   *
+   * @param dataNodeLocationMap The DataNodes with {@link
+   *     org.apache.iotdb.commons.cluster.NodeStatus#Running} State
+   * @return result of transferTrigger
+   */
+  public TSStatus transferTrigger(
+      List<TDataNodeLocation> newUnknownDataList,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    TSStatus transferResult;
+    triggerInfo.acquireTriggerTableLock();
+    try {
+      ConsensusManager consensusManager = configManager.getConsensusManager();
+      NodeManager nodeManager = configManager.getNodeManager();
+
+      transferResult =
+          consensusManager
+              .write(new UpdateTriggersOnTransferNodesPlan(newUnknownDataList))
+              .getStatus();
+      if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return transferResult;
+      }
+
+      List<String> transferringTriggers =
+          ((TransferringTriggersResp)
+                  consensusManager.read(new GetTransferringTriggersPlan()).getDataset())
+              .getTransferringTriggers();
+
+      for (String trigger : transferringTriggers) {
+        TDataNodeLocation newDataNodeLocation =
+            nodeManager.getLowestLoadDataNode(dataNodeLocationMap.keySet());
+
+        transferResult =
+            RpcUtils.squashResponseStatusList(
+                updateTriggerLocation(trigger, newDataNodeLocation, dataNodeLocationMap));
+        if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return transferResult;
+        }
+
+        transferResult =
+            consensusManager
+                .write(new UpdateTriggerLocationPlan(trigger, newDataNodeLocation))
+                .getStatus();
+        if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return transferResult;
+        }
+      }
+    } finally {
+      triggerInfo.releaseTriggerTableLock();
+    }
+
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
+
+  public List<TSStatus> updateTriggerLocation(
+      String triggerName,
+      TDataNodeLocation dataNodeLocation,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    final TUpdateTriggerLocationReq request =
+        new TUpdateTriggerLocationReq(triggerName, dataNodeLocation);
+
+    AsyncClientHandler<TUpdateTriggerLocationReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.UPDATE_TRIGGER_LOCATION, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index b3fcb65ef7..6b36fa8035 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -255,8 +255,17 @@ public class NodeManager {
           preCheckStatus.getStatus());
       return preCheckStatus;
     }
-    // if add request to queue, then return to client
+
     DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+    // do transfer of the DataNodes before remove
+    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataSet.setStatus(
+          new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode())
+              .setMessage("Fail to do transfer of the DataNodes"));
+      return dataSet;
+    }
+    // if add request to queue, then return to client
     boolean registerSucceed =
         configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
     TSStatus status;
@@ -716,8 +725,6 @@ public class NodeManager {
       TSStatus transferResult = configManager.transfer(newUnknownNodes);
       if (transferResult.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         oldUnknownNodes.addAll(newUnknownNodes);
-      } else {
-        LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
       }
     }
   }
@@ -823,6 +830,30 @@ public class NodeManager {
     return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
   }
 
+  /**
+   * Get the DataNodeLocation of the lowest load DataNode in input
+   *
+   * @return TDataNodeLocation
+   */
+  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
+    AtomicInteger result = new AtomicInteger();
+    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+
+    nodes.forEach(
+        nodeID -> {
+          BaseNodeCache cache = nodeCacheMap.get(nodeID);
+          long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
+          if (score < lowestLoadScore.get()) {
+            result.set(nodeID);
+            lowestLoadScore.set(score);
+          }
+        });
+
+    LOGGER.info(
+        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
+    return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+  }
+
   public boolean isNodeRemoving(int dataNodeId) {
     DataNodeHeartbeatCache cache =
         (DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index bcbdee832a..341129110f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -28,12 +28,18 @@ import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
 import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
 import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -91,13 +97,7 @@ public class TriggerInfo implements SnapshotProcessor {
     triggerTableLock.unlock();
   }
 
-  /**
-   * Validate whether the trigger can be created
-   *
-   * @param triggerName
-   * @param jarName
-   * @param jarMD5
-   */
+  /** Validate whether the trigger can be created */
   public void validate(String triggerName, String jarName, String jarMD5) {
     if (triggerTable.containsTrigger(triggerName)) {
       throw new TriggerManagementException(
@@ -114,11 +114,7 @@ public class TriggerInfo implements SnapshotProcessor {
     }
   }
 
-  /**
-   * Validate whether the trigger can be dropped
-   *
-   * @param triggerName
-   */
+  /** Validate whether the trigger can be dropped */
   public void validate(String triggerName) {
     if (triggerTable.containsTrigger(triggerName)) {
       return;
@@ -168,10 +164,16 @@ public class TriggerInfo implements SnapshotProcessor {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
-  public TriggerTableResp getTriggerTable() {
-    return new TriggerTableResp(
-        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-        triggerTable.getAllTriggerInformation());
+  public TriggerTableResp getTriggerTable(GetTriggerTablePlan req) {
+    if (req.isOnlyStateful()) {
+      return new TriggerTableResp(
+          new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+          triggerTable.getAllStatefulTriggerInformation());
+    } else {
+      return new TriggerTableResp(
+          new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+          triggerTable.getAllTriggerInformation());
+    }
   }
 
   public TriggerJarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
@@ -192,6 +194,22 @@ public class TriggerInfo implements SnapshotProcessor {
     return new TriggerJarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
   }
 
+  public TransferringTriggersResp getTransferringTriggers(GetTransferringTriggersPlan req) {
+    return new TransferringTriggersResp(triggerTable.getTransferringTriggers());
+  }
+
+  public TSStatus updateTriggersOnTransferNodes(UpdateTriggersOnTransferNodesPlan physicalPlan) {
+    triggerTable.updateTriggersOnTransferNodes(physicalPlan.getDataNodeLocations());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus updateTriggerLocation(UpdateTriggerLocationPlan physicalPlan) {
+    triggerTable.updateTriggerLocation(
+        physicalPlan.getTriggerName(), physicalPlan.getDataNodeLocation());
+    triggerTable.setTriggerState(physicalPlan.getTriggerName(), TTriggerState.ACTIVE);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   /** only used in Test */
   public Map<String, TriggerInformation> getRawTriggerTable() {
     return triggerTable.getTable();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 4b5f4e49ef..ee2a4849e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -35,7 +35,9 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
 import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
@@ -71,7 +73,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
 import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
 import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
@@ -181,9 +185,11 @@ public class ConfigPlanExecutor {
       case ShowPipe:
         return syncInfo.showPipe((ShowPipePlan) req);
       case GetTriggerTable:
-        return triggerInfo.getTriggerTable();
+        return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
       case GetTriggerJar:
         return triggerInfo.getTriggerJar((GetTriggerJarPlan) req);
+      case GetTransferringTriggers:
+        return triggerInfo.getTransferringTriggers((GetTransferringTriggersPlan) req);
       case GetRegionId:
         return partitionInfo.getRegionId((GetRegionIdPlan) req);
       case GetTimeSlotList:
@@ -267,6 +273,11 @@ public class ConfigPlanExecutor {
         return triggerInfo.deleteTriggerInTable((DeleteTriggerInTablePlan) physicalPlan);
       case UpdateTriggerStateInTable:
         return triggerInfo.updateTriggerStateInTable((UpdateTriggerStateInTablePlan) physicalPlan);
+      case UpdateTriggersOnTransferNodes:
+        return triggerInfo.updateTriggersOnTransferNodes(
+            (UpdateTriggersOnTransferNodesPlan) physicalPlan);
+      case UpdateTriggerLocation:
+        return triggerInfo.updateTriggerLocation((UpdateTriggerLocationPlan) physicalPlan);
       case CreateSchemaTemplate:
         return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
       case UpdateRegionLocation:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index b1f73ff5de..5bd99708ec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -487,9 +487,6 @@ public class ConfigNodeProcedureEnv {
     AsyncClientHandler<TCreateTriggerInstanceReq, TSStatus> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.CREATE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
-    // TODO: The request sent to DataNodes which stateful triggerInstance needn't to be created
-    // don't set
-    // JarFile
     AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
     return clientHandler.getResponseList();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a9fb7bf404..e8c8ac4813 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -206,11 +206,10 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS
         if (RpcUtils.squashResponseStatusList(
                     env.dropTriggerOnDataNodes(triggerInformation.getTriggerName(), false))
                 .getCode()
-            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        } else {
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           throw new TriggerManagementException(
               String.format(
-                  "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+                  "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [%s]",
                   triggerInformation.getTriggerName()));
         }
         break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a777f98eac..8603f18648 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -509,6 +509,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getTriggerTable();
   }
 
+  @Override
+  public TGetTriggerTableResp getStatefulTriggerTable() {
+    return configManager.getStatefulTriggerTable();
+  }
+
   @Override
   public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
     // todo: implementation
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 8e5aaff06d..4858362c81 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
 import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
@@ -85,7 +86,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
 import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
 import org.apache.iotdb.confignode.procedure.Procedure;
@@ -943,11 +946,13 @@ public class ConfigPhysicalPlanSerDeTest {
   }
 
   @Test
-  public void GetTriggerTablePlan() throws IOException {
-    GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
-    Assert.assertTrue(
-        ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer())
-            instanceof GetTriggerTablePlan);
+  public void GetTriggerTablePlanTest() throws IOException {
+    GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
+    GetTriggerTablePlan getTriggerTablePlan1 =
+        (GetTriggerTablePlan)
+            ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer());
+    Assert.assertEquals(
+        getTriggerTablePlan0.isOnlyStateful(), getTriggerTablePlan1.isOnlyStateful());
   }
 
   @Test
@@ -1044,4 +1049,61 @@ public class ConfigPhysicalPlanSerDeTest {
             ConfigPhysicalPlan.Factory.create(getSeriesSlotListPlan0.serializeToByteBuffer());
     Assert.assertEquals(getSeriesSlotListPlan0, getSeriesSlotListPlan1);
   }
+
+  @Test
+  public void UpdateTriggersOnTransferNodesPlanTest() throws IOException {
+    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(2);
+    dataNodeLocations.add(
+        new TDataNodeLocation(
+            10000,
+            new TEndPoint("127.0.0.1", 6600),
+            new TEndPoint("127.0.0.1", 7700),
+            new TEndPoint("127.0.0.1", 8800),
+            new TEndPoint("127.0.0.1", 9900),
+            new TEndPoint("127.0.0.1", 11000)));
+    dataNodeLocations.add(
+        new TDataNodeLocation(
+            20000,
+            new TEndPoint("127.0.0.1", 6600),
+            new TEndPoint("127.0.0.1", 7700),
+            new TEndPoint("127.0.0.1", 8800),
+            new TEndPoint("127.0.0.1", 9900),
+            new TEndPoint("127.0.0.1", 11000)));
+
+    UpdateTriggersOnTransferNodesPlan plan0 =
+        new UpdateTriggersOnTransferNodesPlan(dataNodeLocations);
+    UpdateTriggersOnTransferNodesPlan plan1 =
+        (UpdateTriggersOnTransferNodesPlan)
+            ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+
+    Assert.assertEquals(plan0.getDataNodeLocations(), plan1.getDataNodeLocations());
+  }
+
+  @Test
+  public void UpdateTriggerLocationPlanTest() throws IOException {
+    UpdateTriggerLocationPlan plan0 =
+        new UpdateTriggerLocationPlan(
+            "test",
+            new TDataNodeLocation(
+                10000,
+                new TEndPoint("127.0.0.1", 6600),
+                new TEndPoint("127.0.0.1", 7700),
+                new TEndPoint("127.0.0.1", 8800),
+                new TEndPoint("127.0.0.1", 9900),
+                new TEndPoint("127.0.0.1", 11000)));
+    UpdateTriggerLocationPlan plan1 =
+        (UpdateTriggerLocationPlan)
+            ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+
+    Assert.assertEquals(plan0.getTriggerName(), plan1.getTriggerName());
+    Assert.assertEquals(plan0.getDataNodeLocation(), plan1.getDataNodeLocation());
+  }
+
+  @Test
+  public void GetTransferringTriggersPlanTest() throws IOException {
+    GetTransferringTriggersPlan getTransferringTriggerPlan0 = new GetTransferringTriggersPlan();
+    Assert.assertTrue(
+        ConfigPhysicalPlan.Factory.create(getTransferringTriggerPlan0.serializeToByteBuffer())
+            instanceof GetTransferringTriggersPlan);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index eed43d15fd..fbc09275ae 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.commons.trigger;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -27,9 +28,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /** This Class used to save the information of Triggers and implements methods of manipulate it. */
 @NotThreadSafe
@@ -78,6 +82,42 @@ public class TriggerTable {
     return new ArrayList<>(triggerTable.values());
   }
 
+  public List<TriggerInformation> getAllStatefulTriggerInformation() {
+    return triggerTable.values().stream()
+        .filter(TriggerInformation::isStateful)
+        .collect(Collectors.toList());
+  }
+
+  public List<String> getTransferringTriggers() {
+    return triggerTable.values().stream()
+        .filter(
+            triggerInformation ->
+                triggerInformation.getTriggerState() == TTriggerState.TRANSFERRING)
+        .map(TriggerInformation::getTriggerName)
+        .collect(Collectors.toList());
+  }
+
+  // update stateful trigger to TRANSFERRING which dataNodeLocation is in transferNodes
+  public void updateTriggersOnTransferNodes(List<TDataNodeLocation> transferNodes) {
+    Set<TDataNodeLocation> dataNodeLocationSet = new HashSet<>(transferNodes);
+    triggerTable
+        .values()
+        .forEach(
+            triggerInformation -> {
+              if (triggerInformation.isStateful()
+                  && dataNodeLocationSet.contains(triggerInformation.getDataNodeLocation())) {
+                triggerInformation.setTriggerState(TTriggerState.TRANSFERRING);
+              }
+            });
+  }
+
+  public void updateTriggerLocation(String triggerName, TDataNodeLocation dataNodeLocation) {
+    TriggerInformation triggerInformation = triggerTable.get(triggerName);
+    // triggerInformation will not be null here
+    triggerInformation.setDataNodeLocation(dataNodeLocation);
+    triggerTable.put(triggerName, triggerInformation);
+  }
+
   public boolean isEmpty() {
     return triggerTable.isEmpty();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 9adc73dbd8..a3d4f38722 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -953,6 +953,22 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TGetTriggerTableResp getStatefulTriggerTable() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetTriggerTableResp resp = client.getTriggerTable();
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 4b0e82f43b..75fd26fa74 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -487,10 +487,6 @@ public class DataNode implements DataNodeMBean {
     List<TriggerInformation> res = new ArrayList<>();
     for (TriggerInformation triggerInformation :
         resourcesInformationHolder.getTriggerInformationList()) {
-      if (triggerInformation.isStateful()) {
-        // jar of stateful trigger is not needed
-        continue;
-      }
       // jar does not exist, add current triggerInformation to list
       if (!TriggerExecutableManager.getInstance()
           .hasFileUnderLibRoot(triggerInformation.getJarName())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2341017cd3..30a2eeb644 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -140,6 +140,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
@@ -1032,6 +1033,22 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  @Override
+  public TSStatus updateTriggerLocation(TUpdateTriggerLocationReq req) throws TException {
+    try {
+      TriggerManagementService.getInstance()
+          .updateLocationOfStatefulTrigger(req.triggerName, req.newLocation);
+    } catch (Exception e) {
+      LOGGER.error(
+          "Error occurred during update Location for trigger: {}. The cause is {}.",
+          req.triggerName,
+          e);
+      return new TSStatus(TSStatusCode.UPDATE_TRIGGER_LOCATION_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   @Override
   public TFireTriggerResp fireTrigger(TFireTriggerReq req) {
     String triggerName = req.getTriggerName();
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index fed28359cc..62adc03055 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -135,15 +135,40 @@ public class TriggerManagementService {
     }
   }
 
-  public void updateLocationOfStatefulTrigger(
-      String triggerName, TDataNodeLocation tDataNodeLocation) {
+  public void updateLocationOfStatefulTrigger(String triggerName, TDataNodeLocation newLocation)
+      throws IOException {
     try {
       acquireLock();
       TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
-      triggerInformation.setDataNodeLocation(tDataNodeLocation);
-      triggerTable.setTriggerInformation(triggerName, triggerInformation);
-    } catch (Exception e) {
-      LOGGER.warn("Failed to update location of trigger({}), the cause is: {}", triggerName, e);
+      if (triggerInformation == null || !triggerInformation.isStateful()) {
+        return;
+      }
+      triggerInformation.setDataNodeLocation(newLocation);
+      triggerTable.addTriggerInformation(triggerName, triggerInformation);
+      if (newLocation.getDataNodeId() != DATA_NODE_ID) {
+        // The instance of stateful trigger is created on another DataNode. We need to drop the
+        // instance if it exists on this DataNode
+        TriggerExecutor triggerExecutor = executorMap.remove(triggerName);
+        if (triggerExecutor != null) {
+          triggerExecutor.onDrop();
+        }
+      } else {
+        TriggerExecutor triggerExecutor = executorMap.get(triggerName);
+        if (triggerExecutor != null) {
+          return;
+        }
+        // newLocation of stateful trigger is this DataNode, we need to create its instance if it
+        // does not exist.
+        try (TriggerClassLoader currentActiveClassLoader =
+            TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
+          TriggerExecutor newExecutor =
+              new TriggerExecutor(
+                  triggerInformation,
+                  constructTriggerInstance(
+                      triggerInformation.getClassName(), currentActiveClassLoader));
+          executorMap.put(triggerName, newExecutor);
+        }
+      }
     } finally {
       releaseLock();
     }
@@ -182,7 +207,6 @@ public class TriggerManagementService {
 
   private void checkIfRegistered(TriggerInformation triggerInformation)
       throws TriggerManagementException {
-
     String triggerName = triggerInformation.getTriggerName();
     String jarName = triggerInformation.getJarName();
     if (triggerTable.containsTrigger(triggerName)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 898d6183cf..c5d56e571a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -81,6 +81,7 @@ public enum TSStatusCode {
   CREATE_TRIGGER_INSTANCE_ERROR(362),
   ACTIVE_TRIGGER_INSTANCE_ERROR(363),
   DROP_TRIGGER_INSTANCE_ERROR(364),
+  UPDATE_TRIGGER_LOCATION_ERROR(365),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index fb629fb094..6c87d06b5f 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -314,6 +314,8 @@ enum TTriggerState {
   ACTIVE
   // The intermediate state of Drop trigger, the cluster is in the process of removing the trigger.
   DROPPING
+  // The intermediate state of Transfer trigger, the cluster is in the process of transferring the trigger.
+  TRANSFERRING
 }
 
 struct TCreateTriggerReq {
@@ -785,10 +787,15 @@ service IConfigNodeRPCService {
   TGetLocationForTriggerResp getLocationOfStatefulTrigger(string triggerName)
 
   /**
-     * Return the trigger table of config leader
+     * Return the trigger table
      */
   TGetTriggerTableResp getTriggerTable()
 
+  /**
+     * Return the Stateful trigger table
+     */
+  TGetTriggerTableResp getStatefulTriggerTable()
+
   /**
      * Return the trigger jar list of the trigger name list
      */
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index d75aaa8be8..aa43b7a300 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -187,6 +187,11 @@ struct TDropTriggerInstanceReq {
   2: required bool needToDeleteJarFile
 }
 
+struct TUpdateTriggerLocationReq {
+  1: required string triggerName
+  2: required common.TDataNodeLocation newLocation
+}
+
 struct TFireTriggerReq {
   1: required string triggerName
   2: required binary tablet
@@ -471,6 +476,13 @@ service IDataNodeRPCService {
    **/
   common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req)
 
+  /**
+   * Config node will renew DataNodeLocation of a stateful trigger.
+   *
+   * @param trigger name, new DataNodeLocation
+   **/
+  common.TSStatus updateTriggerLocation (TUpdateTriggerLocationReq req)
+
   /**
     * Fire a stateful trigger on current data node.
     *