You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/09/28 15:47:59 UTC

[GitHub] [iotdb] CRZbulabula commented on a diff in pull request #7474: [IOTDB-4500]CreateTrigger and ShowTrigger process on ConfigNode

CRZbulabula commented on code in PR #7474:
URL: https://github.com/apache/iotdb/pull/7474#discussion_r982561387


##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** remove config node procedure */
+public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerState> {
+  private static final Logger LOG = LoggerFactory.getLogger(CreateTriggerProcedure.class);
+  private static final int retryThreshold = 5;
+
+  private TriggerInformation triggerInformation;
+  private Binary jarFile;
+
+  public CreateTriggerProcedure() {
+    super();
+  }
+
+  public CreateTriggerProcedure(TriggerInformation triggerInformation, Binary jarFile) {
+    super();
+    this.triggerInformation = triggerInformation;
+    this.jarFile = jarFile;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateTriggerState state) {
+    if (triggerInformation == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      switch (state) {
+        case INIT:
+          LOG.info("Start to create trigger [{}]", triggerInformation.getTriggerName());
+
+          TriggerInfo triggerInfo = env.getConfigManager().getTriggerManager().getTriggerInfo();
+          triggerInfo.acquireTriggerTableLock();
+          triggerInfo.validate(
+              triggerInformation.getTriggerName(),
+              triggerInformation.getJarName(),
+              triggerInformation.getJarFileMD5());
+          setNextState(CreateTriggerState.VALIDATED);
+          break;
+
+        case VALIDATED:
+          ConfigManager configManager = env.getConfigManager();
+          boolean needToSaveJar =
+              configManager
+                  .getTriggerManager()
+                  .getTriggerInfo()
+                  .needToSaveJar(triggerInformation.getJarName());
+
+          LOG.info(
+              "Start to add trigger [{}] in TriggerTable on Config Nodes, needToSaveJar[{}]",
+              triggerInformation.getTriggerName(),
+              needToSaveJar);
+
+          ConsensusWriteResponse response =
+              configManager
+                  .getConsensusManager()
+                  .write(
+                      new AddTriggerInTablePlan(
+                          triggerInformation, needToSaveJar ? jarFile : null));
+          if (!response.isSuccessful()) {
+            throw new TriggerManagementException(response.getErrorMessage());
+          }
+
+          setNextState(CreateTriggerState.CONFIG_NODE_INACTIVE);
+          break;
+
+        case CONFIG_NODE_INACTIVE:
+          LOG.info(
+              "Start to create triggerInstance [{}] on Data Nodes",
+              triggerInformation.getTriggerName());
+
+          if (RpcUtils.squashResponseStatusList(
+                      env.createTriggerOnDataNodes(triggerInformation, jarFile))
+                  .getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            setNextState(CreateTriggerState.DATA_NODE_INACTIVE);
+          } else {
+            throw new TriggerManagementException(
+                String.format(
+                    "Fail to create triggerInstance [%s] on Data Nodes",
+                    triggerInformation.getTriggerName()));
+          }
+          break;
+
+        case DATA_NODE_INACTIVE:
+          LOG.info(
+              "Start to active trigger [{}] on Data Nodes", triggerInformation.getTriggerName());
+
+          if (RpcUtils.squashResponseStatusList(
+                      env.activeTriggerOnDataNodes(triggerInformation.getTriggerName()))
+                  .getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            setNextState(CreateTriggerState.DATA_NODE_ACTIVE);
+          } else {
+            throw new TriggerManagementException(
+                String.format(
+                    "Fail to active triggerInstance [%s] on Data Nodes",
+                    triggerInformation.getTriggerName()));
+          }
+          break;
+
+        case DATA_NODE_ACTIVE:
+          LOG.info(
+              "Start to active trigger [{}] on Config Nodes", triggerInformation.getTriggerName());
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(
+                  new UpdateTriggerStateInTablePlan(
+                      triggerInformation.getTriggerName(), TTriggerState.ACTIVE));
+          setNextState(CreateTriggerState.CONFIG_NODE_ACTIVE);
+          break;
+
+        case CONFIG_NODE_ACTIVE:
+          env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        LOG.error("{}", e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOG.error(
+            "Retrievable error trying to create trigger [{}], state [{}]",
+            triggerInformation.getTriggerName(),
+            state,
+            e);
+        if (getCycles() > retryThreshold) {
+          setFailure(
+              new ProcedureException(
+                  String.format(
+                      "Fail to create trigger [%s] at STATE [%s]",
+                      triggerInformation.getTriggerName(), state)));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, CreateTriggerState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case INIT:
+        LOG.info("Start [INIT] rollback of trigger [{}]", triggerInformation.getTriggerName());
+
+        env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+        break;
+
+      case VALIDATED:
+        LOG.info("Start [VALIDATED] rollback of trigger [{}]", triggerInformation.getTriggerName());
+
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+        break;
+
+      case CONFIG_NODE_INACTIVE:
+        LOG.info(
+            "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+
+        if (RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        } else {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+
+      case DATA_NODE_INACTIVE:
+        LOG.info(
+            "Start to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+
+        if (RpcUtils.squashResponseStatusList(
+                    env.inactiveTriggerOnDataNodes(triggerInformation.getTriggerName()))
+                .getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [DATA_NODE_INACTIVE] rollback of trigger [%s]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+
+      default:
+        break;
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(CreateTriggerState state) {
+    return true;
+  }
+
+  @Override
+  protected CreateTriggerState getState(int stateId) {
+    return CreateTriggerState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(CreateTriggerState createTriggerState) {
+    return createTriggerState.ordinal();
+  }
+
+  @Override
+  protected CreateTriggerState getInitialState() {
+    return CreateTriggerState.INIT;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.CREATE_TRIGGER_PROCEDURE.ordinal());
+    super.serialize(stream);
+    triggerInformation.serialize(stream);
+    if (jarFile == null) {
+      ReadWriteIOUtils.write(true, stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+      ReadWriteIOUtils.write(jarFile, stream);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    triggerInformation = TriggerInformation.deserialize(byteBuffer);
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      return;
+    }
+    jarFile = ReadWriteIOUtils.readBinary(byteBuffer);
+  }

Review Comment:
   We should add serialize and deserialize tests for CreateTriggerProcedure~



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TriggerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
+
+  private final ConfigManager configManager;
+  private final TriggerInfo triggerInfo;
+
+  public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
+    this.configManager = configManager;
+    this.triggerInfo = triggerInfo;
+  }
+
+  public TriggerInfo getTriggerInfo() {
+    return triggerInfo;
+  }
+
+  public TSStatus createTrigger(TCreateTriggerReq req) {
+    boolean isStateful = TriggerType.construct(req.getTriggerType()) == TriggerType.STATEFUL;
+    TDataNodeLocation dataNodeLocation =
+        isStateful ? configManager.getNodeManager().getLowestLoadDataNode() : null;
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            (PartialPath) PathDeserializeUtil.deserialize(req.pathPattern),
+            req.getTriggerName(),
+            req.getClassName(),
+            req.getJarPath(),
+            req.getAttributes(),
+            TriggerEvent.construct(req.triggerEvent),
+            TTriggerState.INACTIVE,
+            isStateful,
+            dataNodeLocation,
+            req.getJarMD5());
+    return configManager
+        .getProcedureManager()
+        .createTrigger(triggerInformation, new Binary(req.getJarFile()));
+  }
+
+  public TSStatus dropTrigger(TDropTriggerReq req) {
+    // TODO
+    return null;
+  }
+
+  public TGetTriggerTableResp getTriggerTable() {
+    try {
+      return ((TriggerTableResp)
+              configManager.getConsensusManager().read(new GetTriggerTablePlan()).getDataset())
+          .convertToThriftResponse();
+    } catch (IOException e) {
+      e.printStackTrace();
+      return new TGetTriggerTableResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()), null);

Review Comment:
   It's better to use the e.message() as the response TSStatus's message. Therefore the client that connects to DataNode will receive the reason why cause Exception.



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java:
##########
@@ -362,6 +370,79 @@ public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
 
+  public List<TSStatus> createTriggerOnDataNodes(
+      TriggerInformation triggerInformation, Binary jarFile) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TCreateTriggerInstanceReq request =
+        new TCreateTriggerInstanceReq(
+            triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+    // TODO: The request sent to DataNodes which stateful trigger needn't to be created don't set
+    // JarFile
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation triggerInformation)
+      throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TDropTriggerInstanceReq request =
+        new TDropTriggerInstanceReq(triggerInformation.getTriggerName(), false);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();

Review Comment:
   The same as before



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java:
##########
@@ -362,6 +370,79 @@ public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
 
+  public List<TSStatus> createTriggerOnDataNodes(
+      TriggerInformation triggerInformation, Binary jarFile) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TCreateTriggerInstanceReq request =
+        new TCreateTriggerInstanceReq(
+            triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+    // TODO: The request sent to DataNodes which stateful trigger needn't to be created don't set
+    // JarFile
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation triggerInformation)
+      throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TDropTriggerInstanceReq request =
+        new TDropTriggerInstanceReq(triggerInformation.getTriggerName(), false);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TActiveTriggerInstanceReq request = new TActiveTriggerInstanceReq(triggerName);
+
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();

Review Comment:
   The same as before



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TriggerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
+
+  private final ConfigManager configManager;
+  private final TriggerInfo triggerInfo;
+
+  public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
+    this.configManager = configManager;
+    this.triggerInfo = triggerInfo;
+  }
+
+  public TriggerInfo getTriggerInfo() {
+    return triggerInfo;
+  }
+
+  public TSStatus createTrigger(TCreateTriggerReq req) {
+    boolean isStateful = TriggerType.construct(req.getTriggerType()) == TriggerType.STATEFUL;
+    TDataNodeLocation dataNodeLocation =
+        isStateful ? configManager.getNodeManager().getLowestLoadDataNode() : null;
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            (PartialPath) PathDeserializeUtil.deserialize(req.pathPattern),
+            req.getTriggerName(),
+            req.getClassName(),
+            req.getJarPath(),
+            req.getAttributes(),
+            TriggerEvent.construct(req.triggerEvent),
+            TTriggerState.INACTIVE,
+            isStateful,
+            dataNodeLocation,
+            req.getJarMD5());
+    return configManager
+        .getProcedureManager()
+        .createTrigger(triggerInformation, new Binary(req.getJarFile()));
+  }
+
+  public TSStatus dropTrigger(TDropTriggerReq req) {
+    // TODO
+    return null;
+  }
+
+  public TGetTriggerTableResp getTriggerTable() {
+    try {
+      return ((TriggerTableResp)
+              configManager.getConsensusManager().read(new GetTriggerTablePlan()).getDataset())
+          .convertToThriftResponse();
+    } catch (IOException e) {
+      e.printStackTrace();

Review Comment:
   Better to use LOGGER.error() instead of e.printStackTrace()



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TriggerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
+
+  private final ConfigManager configManager;
+  private final TriggerInfo triggerInfo;
+
+  public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
+    this.configManager = configManager;
+    this.triggerInfo = triggerInfo;
+  }
+
+  public TriggerInfo getTriggerInfo() {
+    return triggerInfo;
+  }
+
+  public TSStatus createTrigger(TCreateTriggerReq req) {
+    boolean isStateful = TriggerType.construct(req.getTriggerType()) == TriggerType.STATEFUL;
+    TDataNodeLocation dataNodeLocation =
+        isStateful ? configManager.getNodeManager().getLowestLoadDataNode() : null;

Review Comment:
   Why not left an annotation here to show the differences of STATEFUL and STATELESS~



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java:
##########
@@ -362,6 +370,79 @@ public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
 
+  public List<TSStatus> createTriggerOnDataNodes(
+      TriggerInformation triggerInformation, Binary jarFile) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TCreateTriggerInstanceReq request =
+        new TCreateTriggerInstanceReq(
+            triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+    // TODO: The request sent to DataNodes which stateful trigger needn't to be created don't set
+    // JarFile
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);

Review Comment:
   Is it always necessary to send Trigger related requests to all registered DataNodes? Since a DataNode could be in UNKNOWN status if there exists network failure or the DataNode is shutdown, for which the request will fail absolutely. We'd better pay attention to this case because it might be a common scenario~



##########
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java:
##########
@@ -66,6 +66,10 @@ public enum ConfigPhysicalPlanType {
   RemoveConfigNode,
   CreateFunction,
   DropFunction,
+  AddTriggerInTable,
+  DeleteTriggerInTable,
+  GetTriggerTable,
+  UpdateTriggerStateInTable,

Review Comment:
   Better add them in the tail of ConfigPhysicalPlanType



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java:
##########
@@ -362,6 +370,79 @@ public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
 
+  public List<TSStatus> createTriggerOnDataNodes(
+      TriggerInformation triggerInformation, Binary jarFile) throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TCreateTriggerInstanceReq request =
+        new TCreateTriggerInstanceReq(
+            triggerInformation.serialize(), ByteBuffer.wrap(jarFile.getValues()));
+    // TODO: The request sent to DataNodes which stateful trigger needn't to be created don't set
+    // JarFile
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation triggerInformation)
+      throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();

Review Comment:
   The same as before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org