You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/20 17:29:33 UTC

[iotdb] branch master updated: [IOTDB-5898] Pipe: pipe meta sync schedular (#9887)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b905b4a831c [IOTDB-5898] Pipe: pipe meta sync schedular (#9887)
b905b4a831c is described below

commit b905b4a831cfdb05e1088dc531da4852b6d14672
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun May 21 01:29:25 2023 +0800

    [IOTDB-5898] Pipe: pipe meta sync schedular (#9887)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../statemachine/ConfigRegionStateMachine.java     |   2 +
 .../iotdb/confignode/manager/ProcedureManager.java |  18 +++
 .../confignode/manager/pipe/PipeMetaSyncer.java    |  80 +++++++++++++
 .../manager/pipe/PipeRuntimeCoordinator.java       |  11 ++
 .../persistence/pipe/PipeTaskOperation.java        |   1 +
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   | 126 +++++++++++++++++++++
 .../procedure/store/ProcedureFactory.java          |   6 +
 .../confignode/procedure/store/ProcedureType.java  |   3 +-
 .../pipe/runtime/PipeMetaSyncProcedureTest.java    |  55 +++++++++
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../db/pipe/agent/runtime/MetaSyncScheduler.java   |  22 ----
 11 files changed, 302 insertions(+), 23 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 168b8fc62d8..53e7f9ae688 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -218,6 +218,7 @@ public class ConfigRegionStateMachine
       // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
       // initialized after notifyLeaderChanged finished
       threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
+      configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
@@ -226,6 +227,7 @@ public class ConfigRegionStateMachine
           newLeaderId);
 
       // Stop leader scheduling services
+      configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
       configManager.getLoadManager().stopLoadServices();
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 272053faf6c..4d67bd09a0e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
@@ -754,6 +755,23 @@ public class ProcedureManager {
     }
   }
 
+  public TSStatus pipeMetaSync() {
+    try {
+      long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (Exception e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
   /**
    * Waiting until the specific procedures finished
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
new file mode 100644
index 00000000000..515abfdbdd1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PipeMetaSyncer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncer.class);
+
+  private static final ScheduledExecutorService SYNC_EXECUTOR =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_META_SYNC_SERVICE.getName());
+  // TODO: make this configurable
+  private static final long SYNC_INTERVAL_MINUTES = 3;
+
+  private final ConfigManager configManager;
+
+  private Future<?> metaSyncFuture;
+
+  public PipeMetaSyncer(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public synchronized void start() {
+    if (metaSyncFuture == null) {
+      metaSyncFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+      LOGGER.info("PipeMetaSyncer is started successfully.");
+    }
+  }
+
+  private void sync() {
+    final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "PipeMetaSyncer meets error in syncing pipe meta, code: {}, message: {}",
+          status.getCode(),
+          status.getMessage());
+    }
+  }
+
+  public synchronized void stop() {
+    if (metaSyncFuture != null) {
+      metaSyncFuture.cancel(false);
+      metaSyncFuture = null;
+      LOGGER.info("PipeMetaSyncer is stopped successfully.");
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index 93065513139..81df8587176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -41,8 +41,11 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
 
   private final ConfigManager configManager;
 
+  private final PipeMetaSyncer pipeMetaSyncer;
+
   public PipeRuntimeCoordinator(ConfigManager configManager) {
     this.configManager = configManager;
+    this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
   }
 
   @Override
@@ -84,4 +87,12 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
           result);
     }
   }
+
+  public void startPipeMetaSync() {
+    pipeMetaSyncer.start();
+  }
+
+  public void stopPipeMetaSync() {
+    pipeMetaSyncer.stop();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index ccded2269b2..e471329c8e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -25,5 +25,6 @@ public enum PipeTaskOperation {
   STOP_PIPE,
   DROP_PIPE,
   HANDLE_LEADER_CHANGE,
+  SYNC_PIPE_META,
   ;
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
new file mode 100644
index 00000000000..167008e2ed3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
+
+  public PipeMetaSyncProcedure() {
+    super();
+  }
+
+  @Override
+  protected PipeTaskOperation getOperation() {
+    return PipeTaskOperation.SYNC_PIPE_META;
+  }
+
+  @Override
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
+
+    // do nothing
+  }
+
+  @Override
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+    LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
+
+    pushPipeMetaToDataNodes(env);
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: rollbackFromValidateTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+    LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
+
+    pushPipeMetaToDataNodes(env);
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.PIPE_META_SYNC_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    return o instanceof PipeMetaSyncProcedure;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index a24475fc8c9..333b3e45f74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
@@ -123,6 +124,9 @@ public class ProcedureFactory implements IProcedureFactory {
       case PIPE_HANDLE_LEADER_CHANGE_PROCEDURE:
         procedure = new PipeHandleLeaderChangeProcedure();
         break;
+      case PIPE_META_SYNC_PROCEDURE:
+        procedure = new PipeMetaSyncProcedure();
+        break;
       case CREATE_CQ_PROCEDURE:
         procedure =
             new CreateCQProcedure(
@@ -210,6 +214,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.DROP_PIPE_PROCEDURE_V2;
     } else if (procedure instanceof PipeHandleLeaderChangeProcedure) {
       return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
+    } else if (procedure instanceof PipeMetaSyncProcedure) {
+      return ProcedureType.PIPE_META_SYNC_PROCEDURE;
     }
     return null;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 8160be0346b..39541414d11 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -73,7 +73,8 @@ public enum ProcedureType {
   DROP_PIPE_PROCEDURE_V2((short) 1003),
 
   /** Pipe Runtime */
-  PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100);
+  PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
+  PIPE_META_SYNC_PROCEDURE((short) 1101);
 
   private final short typeCode;
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java
new file mode 100644
index 00000000000..d3b0508166a
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedureTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PipeMetaSyncProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() {
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+    final PipeMetaSyncProcedure proc1 = new PipeMetaSyncProcedure();
+
+    try {
+      proc1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+      final PipeMetaSyncProcedure proc2 =
+          (PipeMetaSyncProcedure) ProcedureFactory.getInstance().create(buffer);
+
+      assertEquals(proc1, proc2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 5d6fa3b53b4..d156287e1f3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -71,6 +71,7 @@ public enum ThreadName {
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
+  PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
deleted file mode 100644
index 366176cd1cd..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-public class MetaSyncScheduler {}