You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 21:16:55 UTC

[iotdb] 01/03: pipe agent skeleton

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a5139bd4420dba28325eecb91637943f7cfadeb9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Mar 17 12:24:28 2023 +0800

    pipe agent skeleton
---
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  | 53 ++++++++++++
 .../iotdb/db/pipe/agent/PipePluginAgent.java       |  4 +-
 .../iotdb/db/pipe/agent/PipeRuntimeAgent.java      | 35 ++++++++
 .../apache/iotdb/db/pipe/agent/PipeTaskAgent.java  | 35 ++++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       | 98 ++--------------------
 5 files changed, 135 insertions(+), 90 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
new file mode 100644
index 0000000000..e94807e8e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeAgent {
+
+  /** Private constructor to prevent users from creating a new instance. */
+  private PipeAgent() {}
+
+  /**
+   * Get the singleton instance of PipeTaskAgent.
+   *
+   * @return the singleton instance of PipeTaskAgent
+   */
+  public static PipeTaskAgent task() {
+    return PipeTaskAgent.getInstance();
+  }
+
+  /**
+   * Get the singleton instance of PipePluginAgent.
+   *
+   * @return the singleton instance of PipePluginAgent
+   */
+  public static PipePluginAgent plugin() {
+    return PipePluginAgent.getInstance();
+  }
+
+  /**
+   * Get the singleton instance of PipeRuntimeAgent.
+   *
+   * @return the singleton instance of PipeRuntimeAgent
+   */
+  public static PipeRuntimeAgent runtime() {
+    return PipeRuntimeAgent.getInstance();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
index d820441dac..a0a62ca118 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
@@ -181,11 +181,13 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
+  private PipePluginAgent() {}
+
   private static class PipePluginAgentServiceHolder {
     private static final PipePluginAgent INSTANCE = new PipePluginAgent();
   }
 
-  public static PipePluginAgent getInstance() {
+  static PipePluginAgent getInstance() {
     return PipePluginAgentServiceHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
new file mode 100644
index 0000000000..fe9ab3ee8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeRuntimeAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeRuntimeAgent() {}
+
+  private static class PipeRuntimeAgentHolder {
+    private static final PipeRuntimeAgent INSTANCE = new PipeRuntimeAgent();
+  }
+
+  static PipeRuntimeAgent getInstance() {
+    return PipeRuntimeAgent.PipeRuntimeAgentHolder.INSTANCE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
new file mode 100644
index 0000000000..75c8e14636
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeTaskAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskAgent() {}
+
+  private static class PipeTaskAgentHolder {
+    private static final PipeTaskAgent INSTANCE = new PipeTaskAgent();
+  }
+
+  static PipeTaskAgent getInstance() {
+    return PipeTaskAgent.PipeTaskAgentHolder.INSTANCE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 60edda2b56..308afe736d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,14 +19,8 @@
 
 package org.apache.iotdb.db.service.thrift.impl;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.common.rpc.thrift.*;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -92,17 +86,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.*;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
@@ -115,68 +104,7 @@ import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.type.AutoGauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteModelMetricsReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
-import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
-import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
-import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
+import org.apache.iotdb.mpp.rpc.thrift.*;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
@@ -185,8 +113,6 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,12 +121,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1147,8 +1068,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     if (configNodeLocations != null) {
       ConfigNodeInfo.getInstance()
           .updateConfigNodeList(
-              configNodeLocations
-                  .parallelStream()
+              configNodeLocations.parallelStream()
                   .map(TConfigNodeLocation::getInternalEndPoint)
                   .collect(Collectors.toList()));
     }
@@ -1482,7 +1402,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) {
     try {
       PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta);
-      PipePluginAgent.getInstance().register(pipePluginMeta, req.jarFile);
+      PipeAgent.plugin().register(pipePluginMeta, req.jarFile);
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
@@ -1493,7 +1413,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) {
     try {
-      PipePluginAgent.getInstance().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
+      PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())