You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/20 17:29:33 UTC
[iotdb] branch master updated: [IOTDB-5898] Pipe: pipe meta sync schedular (#9887)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b905b4a831c [IOTDB-5898] Pipe: pipe meta sync schedular (#9887)
b905b4a831c is described below
commit b905b4a831cfdb05e1088dc531da4852b6d14672
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun May 21 01:29:25 2023 +0800
[IOTDB-5898] Pipe: pipe meta sync schedular (#9887)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../statemachine/ConfigRegionStateMachine.java | 2 +
.../iotdb/confignode/manager/ProcedureManager.java | 18 +++
.../confignode/manager/pipe/PipeMetaSyncer.java | 80 +++++++++++++
.../manager/pipe/PipeRuntimeCoordinator.java | 11 ++
.../persistence/pipe/PipeTaskOperation.java | 1 +
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 126 +++++++++++++++++++++
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 3 +-
.../pipe/runtime/PipeMetaSyncProcedureTest.java | 55 +++++++++
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../db/pipe/agent/runtime/MetaSyncScheduler.java | 22 ----
11 files changed, 302 insertions(+), 23 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 168b8fc62d8..53e7f9ae688 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -218,6 +218,7 @@ public class ConfigRegionStateMachine
// 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
// initialized after notifyLeaderChanged finished
threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
+ configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
} else {
LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
@@ -226,6 +227,7 @@ public class ConfigRegionStateMachine
newLeaderId);
// Stop leader scheduling services
+ configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
configManager.getLoadManager().stopLoadServices();
configManager.getProcedureManager().shiftExecutor(false);
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 272053faf6c..4d67bd09a0e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
@@ -754,6 +755,23 @@ public class ProcedureManager {
}
}
+ public TSStatus pipeMetaSync() {
+ try {
+ long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (Exception e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
/**
* Waiting until the specific procedures finished
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
new file mode 100644
index 00000000000..515abfdbdd1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PipeMetaSyncer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncer.class);
+
+ private static final ScheduledExecutorService SYNC_EXECUTOR =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_META_SYNC_SERVICE.getName());
+ // TODO: make this configurable
+ private static final long SYNC_INTERVAL_MINUTES = 3;
+
+ private final ConfigManager configManager;
+
+ private Future<?> metaSyncFuture;
+
+ public PipeMetaSyncer(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ public synchronized void start() {
+ if (metaSyncFuture == null) {
+ metaSyncFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+ LOGGER.info("PipeMetaSyncer is started successfully.");
+ }
+ }
+
+ private void sync() {
+ final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "PipeMetaSyncer meets error in syncing pipe meta, code: {}, message: {}",
+ status.getCode(),
+ status.getMessage());
+ }
+ }
+
+ public synchronized void stop() {
+ if (metaSyncFuture != null) {
+ metaSyncFuture.cancel(false);
+ metaSyncFuture = null;
+ LOGGER.info("PipeMetaSyncer is stopped successfully.");
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index 93065513139..81df8587176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -41,8 +41,11 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
private final ConfigManager configManager;
+ private final PipeMetaSyncer pipeMetaSyncer;
+
public PipeRuntimeCoordinator(ConfigManager configManager) {
this.configManager = configManager;
+ this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
}
@Override
@@ -84,4 +87,12 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
result);
}
}
+
+ public void startPipeMetaSync() {
+ pipeMetaSyncer.start();
+ }
+
+ public void stopPipeMetaSync() {
+ pipeMetaSyncer.stop();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index ccded2269b2..e471329c8e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -25,5 +25,6 @@ public enum PipeTaskOperation {
STOP_PIPE,
DROP_PIPE,
HANDLE_LEADER_CHANGE,
+ SYNC_PIPE_META,
;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
new file mode 100644
index 00000000000..167008e2ed3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
+
+ public PipeMetaSyncProcedure() {
+ super();
+ }
+
+ @Override
+ protected PipeTaskOperation getOperation() {
+ return PipeTaskOperation.SYNC_PIPE_META;
+ }
+
+ @Override
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
+
+ // do nothing
+ }
+
+ @Override
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+ LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
+
+ pushPipeMetaToDataNodes(env);
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: rollbackFromValidateTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
+
+ // do nothing
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+ LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
+
+ pushPipeMetaToDataNodes(env);
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.PIPE_META_SYNC_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o instanceof PipeMetaSyncProcedure;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index a24475fc8c9..333b3e45f74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
@@ -123,6 +124,9 @@ public class ProcedureFactory implements IProcedureFactory {
case PIPE_HANDLE_LEADER_CHANGE_PROCEDURE:
procedure = new PipeHandleLeaderChangeProcedure();
break;
+ case PIPE_META_SYNC_PROCEDURE:
+ procedure = new PipeMetaSyncProcedure();
+ break;
case CREATE_CQ_PROCEDURE:
procedure =
new CreateCQProcedure(
@@ -210,6 +214,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DROP_PIPE_PROCEDURE_V2;
} else if (procedure instanceof PipeHandleLeaderChangeProcedure) {
return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
+ } else if (procedure instanceof PipeMetaSyncProcedure) {
+ return ProcedureType.PIPE_META_SYNC_PROCEDURE;
}
return null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 8160be0346b..39541414d11 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -73,7 +73,8 @@ public enum ProcedureType {
DROP_PIPE_PROCEDURE_V2((short) 1003),
/** Pipe Runtime */
- PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100);
+ PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
+ PIPE_META_SYNC_PROCEDURE((short) 1101);
private final short typeCode;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java
new file mode 100644
index 00000000000..d3b0508166a
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PipeMetaSyncProcedureTest {
+
+ @Test
+ public void serializeDeserializeTest() {
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ final PipeMetaSyncProcedure proc1 = new PipeMetaSyncProcedure();
+
+ try {
+ proc1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+ final PipeMetaSyncProcedure proc2 =
+ (PipeMetaSyncProcedure) ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(proc1, proc2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 5d6fa3b53b4..d156287e1f3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -71,6 +71,7 @@ public enum ThreadName {
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
+ PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
;
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
deleted file mode 100644
index 366176cd1cd..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-public class MetaSyncScheduler {}