You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/28 15:41:55 UTC

[iotdb] 01/02: progress index impl for simple consensus

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5723
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d8765d939929a9c0c4ad8a4f2ffe9f86805cf20
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 28 22:41:52 2023 +0800

    progress index impl for simple consensus
---
 .../commons/consensus/index/ProgressIndexType.java |   6 +
 .../consensus/index/impl/SimpleProgressIndex.java  | 160 +++++++++++++++++++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  17 ++-
 .../SimpleConsensusProgressIndexAssigner.java      | 108 ++++++++++++++
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 5 files changed, 291 insertions(+), 2 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index 37d83a52efb..02afa4045df 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.consensus.index;
 
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
 public enum ProgressIndexType {
   MINIMUM_CONSENSUS_INDEX((short) 1),
   IOT_CONSENSUS_INDEX((short) 2),
+  SIMPLE_CONSENSUS_INDEX((short) 3),
   ;
 
   private final short type;
@@ -58,6 +60,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(byteBuffer);
       case 2:
         return IoTProgressIndex.deserializeFrom(byteBuffer);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
@@ -71,6 +75,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(stream);
       case 2:
         return IoTProgressIndex.deserializeFrom(stream);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
new file mode 100644
index 00000000000..6571d90759a
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class SimpleProgressIndex implements ProgressIndex {
+
+  private final int rebootTimes;
+  private final long memtableFlushOrderId;
+
+  public SimpleProgressIndex(int rebootTimes, long memtableFlushOrderId) {
+    this.rebootTimes = rebootTimes;
+    this.memtableFlushOrderId = memtableFlushOrderId;
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(rebootTimes, byteBuffer);
+    ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream);
+
+    ReadWriteIOUtils.write(rebootTimes, stream);
+    ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    if (progressIndex instanceof MinimumProgressIndex) {
+      return true;
+    }
+
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) {
+      return true;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) {
+      return false;
+    }
+    // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+    return thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    return thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+        && thisSimpleProgressIndex.memtableFlushOrderId
+            == thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof SimpleProgressIndex)) {
+      return false;
+    }
+    return this.equals((SimpleProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return this;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        < thatSimpleProgressIndex.memtableFlushOrderId) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.memtableFlushOrderId == thatSimpleProgressIndex.memtableFlushOrderId
+    return this;
+  }
+
+  public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final int rebootTimes = ReadWriteIOUtils.readInt(byteBuffer);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(byteBuffer);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  public static SimpleProgressIndex deserializeFrom(InputStream stream) throws IOException {
+    final int rebootTimes = ReadWriteIOUtils.readInt(stream);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(stream);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  @Override
+  public String toString() {
+    return "SimpleProgressIndex{"
+        + "rebootTimes="
+        + rebootTimes
+        + ", memtableFlushOrderId="
+        + memtableFlushOrderId
+        + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5253d5a2db2..486afe825e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -38,9 +39,15 @@ public class PipeRuntimeAgent implements IService {
 
   private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
-  public synchronized void launchPipePluginAgent(
+  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
+      new SimpleConsensusProgressIndexAssigner();
+
+  //////////////////////////// System Service Interface ////////////////////////////
+
+  public synchronized void preparePipeResources(
       ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
     PipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+    simpleConsensusProgressIndexAssigner.start();
   }
 
   @Override
@@ -69,6 +76,14 @@ public class PipeRuntimeAgent implements IService {
     return ServiceType.PIPE_RUNTIME_AGENT;
   }
 
+  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
+
+  public SimpleProgressIndex assignSimpleProgressIndex() {
+    return simpleConsensusProgressIndexAssigner.assign();
+  }
+
+  //////////////////////////// Runtime Exception Handlers ////////////////////////////
+
   public void report(PipeSubtask subtask) {
     // TODO: terminate the task by the given taskID
     LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
new file mode 100644
index 00000000000..195637a63de
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+
+public class SimpleConsensusProgressIndexAssigner {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SimpleConsensusProgressIndexAssigner.class);
+
+  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final String PIPE_SYSTEM_DIR =
+      IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+          + File.separator
+          + "pipe"
+          + File.separator;
+  private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt";
+
+  private boolean isEnable = false;
+
+  private int rebootTimes = 0;
+  private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
+
+  public void start() throws StartupException {
+    // only works for simple consensus
+    if (!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
+      return;
+    }
+
+    isEnable = true;
+    LOGGER.info("Start SimpleConsensusProgressIndexAssigner ...");
+
+    try {
+      makeDirIfNecessary();
+      parseRebootTimes();
+      recordRebootTimes();
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void makeDirIfNecessary() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR);
+    if (file.exists() && file.isDirectory()) {
+      return;
+    }
+    FileUtils.forceMkdir(file);
+  }
+
+  private void parseRebootTimes() {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
+    if (!file.exists()) {
+      rebootTimes = 0;
+      return;
+    }
+    try {
+      String content = FileUtils.readFileToString(file, "UTF-8");
+      rebootTimes = Integer.parseInt(content);
+    } catch (IOException e) {
+      LOGGER.error("Cannot parse reboot times from file {}", file.getAbsolutePath(), e);
+      rebootTimes = 0;
+    }
+  }
+
+  private void recordRebootTimes() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
+    FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), "UTF-8");
+  }
+
+  public SimpleProgressIndex assign() {
+    return isEnable
+        ? new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement())
+        : null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index db28843d7c7..e153f902fbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -836,7 +836,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder);
+    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {