You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/23 18:47:44 UTC
[iotdb] branch master updated: [IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e46be0e9623 [IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)
e46be0e9623 is described below
commit e46be0e9623791cf590230aa0fe965ecf3e0c103
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Wed May 24 02:47:35 2023 +0800
[IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../heartbeat/DataNodeHeartbeatHandler.java | 10 +-
.../consensus/request/ConfigPhysicalPlan.java | 6 +-
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../PipeHandleLeaderChangePlan.java | 2 +-
.../pipe/runtime/PipeHandleMetaChangePlan.java | 67 +++++
.../iotdb/confignode/manager/ProcedureManager.java | 22 ++
.../manager/load/service/HeartbeatService.java | 16 +-
.../pipe/runtime/PipeRuntimeCoordinator.java | 21 ++
.../persistence/executor/ConfigPlanExecutor.java | 7 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 13 +-
.../persistence/pipe/PipeTaskOperation.java | 2 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 285 +++++++++++++++++++++
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 3 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 44 +++-
.../iotdb/confignode/persistence/PipeInfoTest.java | 1 -
.../runtime/PipeHandleMetaChangeProcedureTest.java | 96 +++++++
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 13 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 31 +++
.../impl/DataNodeInternalRPCServiceImpl.java | 4 +
thrift/src/main/thrift/datanode.thrift | 2 +
22 files changed, 629 insertions(+), 25 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index a7348e5d6af..8cfcda8c87e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -42,13 +43,16 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
private final Consumer<Map<Integer, Long>> schemaQuotaRespProcess;
+ private final PipeRuntimeCoordinator pipeRuntimeCoordinator;
+
public DataNodeHeartbeatHandler(
int nodeId,
LoadCache loadCache,
Map<Integer, Long> deviceNum,
Map<Integer, Long> timeSeriesNum,
Map<Integer, Long> regionDisk,
- Consumer<Map<Integer, Long>> schemaQuotaRespProcess) {
+ Consumer<Map<Integer, Long>> schemaQuotaRespProcess,
+ PipeRuntimeCoordinator pipeRuntimeCoordinator) {
this.nodeId = nodeId;
this.loadCache = loadCache;
@@ -56,6 +60,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
this.timeSeriesNum = timeSeriesNum;
this.regionDisk = regionDisk;
this.schemaQuotaRespProcess = schemaQuotaRespProcess;
+ this.pipeRuntimeCoordinator = pipeRuntimeCoordinator;
}
@Override
@@ -106,6 +111,9 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
break;
}
}
+ if (heartbeatResp.getPipeMetaList() != null) {
+ pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList());
+ }
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 2991f05ea65..ff915a93e97 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -77,9 +77,10 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -396,6 +397,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case PipeHandleLeaderChange:
plan = new PipeHandleLeaderChangePlan();
break;
+ case PipeHandleMetaChange:
+ plan = new PipeHandleMetaChangePlan();
+ break;
case GetRegionId:
plan = new GetRegionIdPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index d36a9973cd5..b4ab2ad4953 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -182,6 +182,7 @@ public enum ConfigPhysicalPlanType {
/** Pipe Runtime */
PipeHandleLeaderChange((short) 1600),
+ PipeHandleMetaChange((short) 1601),
;
private final short planType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
similarity index 99%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
index 8686e6e75f8..207cbe34b04 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
new file mode 100644
index 00000000000..983574618ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe.runtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeHandleMetaChangePlan extends ConfigPhysicalPlan {
+
+ private List<PipeMeta> pipeMetaList = new ArrayList<>();
+
+ public PipeHandleMetaChangePlan() {
+ super(ConfigPhysicalPlanType.PipeHandleMetaChange);
+ }
+
+ public PipeHandleMetaChangePlan(List<PipeMeta> pipeMetaList) {
+ super(ConfigPhysicalPlanType.PipeHandleMetaChange);
+ this.pipeMetaList = pipeMetaList;
+ }
+
+ public List<PipeMeta> getPipeMetaList() {
+ return pipeMetaList;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+
+ stream.writeInt(pipeMetaList.size());
+ for (PipeMeta pipeMeta : pipeMetaList) {
+ pipeMeta.serialize(stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ int size = buffer.getInt();
+ for (int i = 0; i < size; i++) {
+ PipeMeta pipeMeta = PipeMeta.deserialize(buffer);
+ pipeMetaList.add(pipeMeta);
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4d67bd09a0e..a33072f8b19 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
@@ -87,6 +88,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -772,6 +774,26 @@ public class ProcedureManager {
}
}
+ public TSStatus pipeHandleMetaChange(
+ int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+ try {
+ long procedureId =
+ executor.submitProcedure(
+ new PipeHandleMetaChangeProcedure(dataNodeId, pipeMetaByteBufferListFromDataNode));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (Exception e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
/**
* Waiting until the specific procedures finished
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 6e2eb055428..38a6d8dc5f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -43,7 +43,7 @@ import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/** Maintain the Cluster-Heartbeat-Service. */
public class HeartbeatService {
@@ -63,7 +63,7 @@ public class HeartbeatService {
private Future<?> currentHeartbeatFuture;
private final ScheduledExecutorService heartBeatExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
- private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+ private final AtomicLong heartbeatCounter = new AtomicLong(0);
public HeartbeatService(IManager configManager, LoadCache loadCache) {
this.configManager = configManager;
@@ -124,14 +124,17 @@ public class HeartbeatService {
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
-
- /* Update heartbeat counter */
- heartbeatCounter.getAndUpdate(x -> (x + 1) % 10);
+ // We collect pipe meta in every 100 heartbeat loop, TODO: make this configurable
+ heartbeatReq.setNeedPipeMetaList(heartbeatCounter.get() % 100 == 0);
if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}
+
+ /* Update heartbeat counter */
+ heartbeatCounter.getAndIncrement();
+
return heartbeatReq;
}
@@ -175,7 +178,8 @@ public class HeartbeatService {
configManager.getClusterQuotaManager().getDeviceNum(),
configManager.getClusterQuotaManager().getTimeSeriesNum(),
configManager.getClusterQuotaManager().getRegionDisk(),
- configManager.getClusterSchemaManager()::updateSchemaQuota);
+ configManager.getClusterSchemaManager()::updateSchemaQuota,
+ configManager.getPipeManager().getPipeRuntimeCoordinator());
configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
AsyncDataNodeHeartbeatClientPool.getInstance()
.getDataNodeHeartBeat(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 9a117a0fbd1..3455a6c1369 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -29,10 +29,13 @@ import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
@@ -95,4 +98,22 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
public void stopPipeMetaSync() {
pipeMetaSyncer.stop();
}
+
+ /**
+ * parse heartbeat from data node.
+ *
+ * @param dataNodeId data node id
+ * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list collected from data node
+ */
+ public void parseHeartbeat(
+ int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+ final TSStatus result =
+ configManager
+ .getProcedureManager()
+ .pipeHandleMetaChange(dataNodeId, pipeMetaByteBufferListFromDataNode);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "PipeTaskCoordinator meets error in handling pipe meta change, status: ({})", result);
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 2f812b173b2..2cd14b69fc8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -75,9 +75,10 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -408,6 +409,10 @@ public class ConfigPlanExecutor {
return pipeInfo
.getPipeTaskInfo()
.handleLeaderChange((PipeHandleLeaderChangePlan) physicalPlan);
+ case PipeHandleMetaChange:
+ return pipeInfo
+ .getPipeTaskInfo()
+ .handleMetaChanges((PipeHandleMetaChangePlan) physicalPlan);
case ADD_CQ:
return cqInfo.addCQ((AddCQPlan) physicalPlan);
case DROP_CQ:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 5ac5b7fca21..76ec9007ffa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -25,7 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -231,6 +232,16 @@ public class PipeTaskInfo implements SnapshotProcessor {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+ pipeMetaKeeper.clear();
+ plan.getPipeMetaList()
+ .forEach(
+ pipeMeta -> {
+ pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+ });
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/////////////////////////////// Snapshot ///////////////////////////////
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index e471329c8e6..6531f84e049 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -26,5 +26,5 @@ public enum PipeTaskOperation {
DROP_PIPE,
HANDLE_LEADER_CHANGE,
SYNC_PIPE_META,
- ;
+ HANDLE_PIPE_META_CHANGE
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 3fbe3091101..0c218184b5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
new file mode 100644
index 00000000000..db04d909f7d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
+
+ private int dataNodeId;
+ private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
+
+ private boolean needWriteConsensusOnConfigNodes = false;
+ private boolean needPushPipeMetaToDataNodes = false;
+
+ public PipeHandleMetaChangeProcedure() {
+ super();
+ pipeMetaByteBufferListFromDataNode = new ArrayList<>();
+ }
+
+ public PipeHandleMetaChangeProcedure(
+ int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+ super();
+ this.dataNodeId = dataNodeId;
+ this.pipeMetaByteBufferListFromDataNode = pipeMetaByteBufferListFromDataNode;
+ needWriteConsensusOnConfigNodes = false;
+ needPushPipeMetaToDataNodes = false;
+ }
+
+ @Override
+ protected PipeTaskOperation getOperation() {
+ return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
+ }
+
+ @Override
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: executeFromCalculateInfoForTask");
+
+ final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new HashMap<>();
+ for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
+ final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
+ pipeMetaMapFromDataNode.put(pipeMeta.getStaticMeta(), pipeMeta);
+ }
+
+ for (final PipeMeta pipeMetaOnConfigNode :
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .getPipeTaskInfo()
+ .getPipeMetaList()) {
+ final PipeMeta pipeMetaFromDataNode =
+ pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
+ if (pipeMetaFromDataNode == null) {
+ LOGGER.warn(
+ "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
+ + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
+ pipeMetaOnConfigNode);
+ continue;
+ }
+
+ final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapOnConfigNode =
+ pipeMetaOnConfigNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
+ final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapFromDataNode =
+ pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
+ for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> runtimeMetaOnConfigNode :
+ pipeTaskMetaMapOnConfigNode.entrySet()) {
+ if (runtimeMetaOnConfigNode.getValue().getRegionLeader() != dataNodeId) {
+ continue;
+ }
+
+ final PipeTaskMeta runtimeMetaFromDataNode =
+ pipeTaskMetaMapFromDataNode.get(runtimeMetaOnConfigNode.getKey());
+ if (runtimeMetaFromDataNode == null) {
+ LOGGER.warn(
+ "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
+ + "runtimeMetaFromDataNode is null, runtimeMetaOnConfigNode: {}",
+ runtimeMetaOnConfigNode);
+ continue;
+ }
+
+ // update progress index
+ if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
+ < runtimeMetaFromDataNode.getProgressIndex()) {
+ runtimeMetaOnConfigNode
+ .getValue()
+ .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+ needWriteConsensusOnConfigNodes = true;
+ }
+
+ // update runtime exception
+ final PipeTaskMeta pipeTaskMetaOnConfigNode = runtimeMetaOnConfigNode.getValue();
+ pipeTaskMetaOnConfigNode.clearExceptionMessages();
+ for (final PipeRuntimeException exception :
+ runtimeMetaFromDataNode.getExceptionMessages()) {
+ pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
+ if (exception instanceof PipeRuntimeCriticalException) {
+ pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+ needWriteConsensusOnConfigNodes = true;
+ needPushPipeMetaToDataNodes = true;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: executeFromWriteConfigNodeConsensus");
+
+ if (!needWriteConsensusOnConfigNodes) {
+ return;
+ }
+
+ final List<PipeMeta> pipeMetaList = new ArrayList<>();
+ for (final PipeMeta pipeMeta :
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .getPipeTaskInfo()
+ .getPipeMetaList()) {
+ pipeMetaList.add(pipeMeta);
+ }
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new PipeHandleMetaChangePlan(pipeMetaList));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+ LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
+
+ if (!needPushPipeMetaToDataNodes) {
+ return;
+ }
+
+ pushPipeMetaToDataNodes(env);
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromValidateTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromCalculateInfoForTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromWriteConfigNodeConsensus");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromOperateOnDataNodes");
+
+ // do nothing
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+
+ ReadWriteIOUtils.write(dataNodeId, stream);
+
+ ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
+ for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
+ ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
+ ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
+ }
+
+ ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
+ ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+
+ dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ final int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ final ByteBuffer pipeMetaByteBuffer =
+ ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
+ pipeMetaByteBuffer.limit(limit);
+ pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
+ }
+
+ needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
+ needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PipeHandleMetaChangeProcedure)) {
+ return false;
+ }
+ PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
+ return dataNodeId == that.dataNodeId
+ && needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes
+ && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
+ && Objects.equals(
+ pipeMetaByteBufferListFromDataNode, that.pipeMetaByteBufferListFromDataNode);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ dataNodeId,
+ pipeMetaByteBufferListFromDataNode,
+ needWriteConsensusOnConfigNodes,
+ needPushPipeMetaToDataNodes);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 333b3e45f74..ab180dc8026 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
@@ -127,6 +128,9 @@ public class ProcedureFactory implements IProcedureFactory {
case PIPE_META_SYNC_PROCEDURE:
procedure = new PipeMetaSyncProcedure();
break;
+ case PIPE_HANDLE_META_CHANGE_PROCEDURE:
+ procedure = new PipeHandleMetaChangeProcedure();
+ break;
case CREATE_CQ_PROCEDURE:
procedure =
new CreateCQProcedure(
@@ -216,6 +220,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
} else if (procedure instanceof PipeMetaSyncProcedure) {
return ProcedureType.PIPE_META_SYNC_PROCEDURE;
+ } else if (procedure instanceof PipeHandleMetaChangeProcedure) {
+ return ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE;
}
return null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 39541414d11..12839b5b412 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -74,7 +74,8 @@ public enum ProcedureType {
/** Pipe Runtime */
PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
- PIPE_META_SYNC_PROCEDURE((short) 1101);
+ PIPE_META_SYNC_PROCEDURE((short) 1101),
+ PIPE_HANDLE_META_CHANGE_PROCEDURE((short) 1102);
private final short typeCode;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index ef8fdef3994..9a83e00acd7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -91,9 +92,10 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -1136,6 +1138,46 @@ public class ConfigPhysicalPlanSerDeTest {
pipeHandleLeaderChangePlan1.getConsensusGroupId2NewDataRegionLeaderIdMap());
}
+ @Test
+ public void pipeHandleMetaChangePlanTest() throws IOException {
+ List<PipeMeta> pipeMetaList = new ArrayList<>();
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ "pipeName",
+ 123L,
+ new HashMap() {
+ {
+ put("collector-key", "collector-value");
+ }
+ },
+ new HashMap() {
+ {
+ put("processor-key-1", "processor-value-1");
+ put("processor-key-2", "processor-value-2");
+ }
+ },
+ new HashMap() {});
+ PipeRuntimeMeta pipeRuntimeMeta =
+ new PipeRuntimeMeta(
+ new HashMap() {
+ {
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+ new PipeTaskMeta(789, 987));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+ new PipeTaskMeta(456, 789));
+ }
+ });
+ pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
+ PipeHandleMetaChangePlan pipeHandleMetaChangePlan1 = new PipeHandleMetaChangePlan(pipeMetaList);
+ PipeHandleMetaChangePlan pipeHandleMetaChangePlan2 =
+ (PipeHandleMetaChangePlan)
+ ConfigPhysicalPlan.Factory.create(pipeHandleMetaChangePlan1.serializeToByteBuffer());
+ Assert.assertEquals(
+ pipeHandleMetaChangePlan1.getPipeMetaList(), pipeHandleMetaChangePlan2.getPipeMetaList());
+ }
+
@Test
public void GetTriggerTablePlanTest() throws IOException {
GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a547fdde521..0246759c98c 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -73,7 +73,6 @@ public class PipeInfoTest {
processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
- pipeTaskMeta.trackException(true, "someError");
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
new file mode 100644
index 00000000000..07d74e2d9ff
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PipeHandleMetaChangeProcedureTest {
+
+ @Test
+ public void serializeDeserializeTest() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ "pipeName",
+ 123L,
+ new HashMap() {
+ {
+ put("collector-key", "collector-value");
+ }
+ },
+ new HashMap() {
+ {
+ put("processor-key-1", "processor-value-1");
+ put("processor-key-2", "processor-value-2");
+ }
+ },
+ new HashMap() {});
+ PipeRuntimeMeta pipeRuntimeMeta =
+ new PipeRuntimeMeta(
+ new HashMap() {
+ {
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+ new PipeTaskMeta(789, 987));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+ new PipeTaskMeta(456, 789));
+ }
+ });
+
+ PipeHandleMetaChangeProcedure proc =
+ new PipeHandleMetaChangeProcedure(
+ 123,
+ Collections.singletonList(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta).serialize()));
+
+ try {
+ proc.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ PipeHandleMetaChangeProcedure proc2 =
+ (PipeHandleMetaChangeProcedure) ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(proc, proc2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 213c3da3a22..bb5149fb1f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,16 +62,12 @@ public class PipeTaskMeta {
return exceptionMessages;
}
- public void mergeExceptionMessages(
- Collection<? extends PipeRuntimeException> newExceptionMessages) {
- exceptionMessages.addAll(newExceptionMessages);
+ public void trackExceptionMessage(PipeRuntimeException exceptionMessage) {
+ exceptionMessages.add(exceptionMessage);
}
- public void trackException(boolean critical, String message) {
- exceptionMessages.add(
- critical
- ? new PipeRuntimeCriticalException(message)
- : new PipeRuntimeNonCriticalException(message));
+ public void clearExceptionMessages() {
+ exceptionMessages.clear();
}
public void setProgressIndex(long progressIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e231d818dbf..2ff33b450c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -31,12 +31,18 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -430,6 +436,12 @@ public class PipeTaskAgent {
// set pipe meta status to RUNNING
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+ // clear exception messages if started successfully
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .values()
+ .forEach(PipeTaskMeta::clearExceptionMessages);
}
private void stopPipe(String pipeName, long creationTime) {
@@ -539,4 +551,23 @@ public class PipeTaskAgent {
pipeTask.stop();
}
}
+
+ ///////////////////////// Heartbeat /////////////////////////
+
+ public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
+ throws TException {
+ if (!req.isNeedPipeMetaList()) {
+ return;
+ }
+
+ final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ try {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ pipeMetaBinaryList.add(pipeMeta.serialize());
+ }
+ } catch (IOException e) {
+ throw new TException(e);
+ }
+ resp.setPipeMetaList(pipeMetaBinaryList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 86feb49b8db..af62026464e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1023,6 +1023,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
// Update schema quota if necessary
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, resp);
+
+ // Update pipe meta if necessary
+ PipeAgent.task().collectPipeMetaList(req, resp);
+
return resp;
}
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index d467625f485..d519a2e80e4 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -248,6 +248,7 @@ struct THeartbeatReq {
5: optional list<i32> schemaRegionIds
6: optional list<i32> dataRegionIds
7: optional map<string, common.TSpaceQuota> spaceQuotaUsage
+ 8: optional bool needPipeMetaList
}
struct THeartbeatResp {
@@ -261,6 +262,7 @@ struct THeartbeatResp {
8: optional map<i32, i64> regionDisk
// TODO: schemaLimitLevel can be removed if confignode support hot load configuration
9: optional TSchemaLimitLevel schemaLimitLevel
+ 10: optional list<binary> pipeMetaList
}
enum TSchemaLimitLevel{