You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:02 UTC

[iotdb] 02/09: add interface

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4edae81a88055677686fcc5a84774685648453ac
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 14:54:49 2021 +0800

    add interface
---
 .../log/sequencing/AsynchronousSequencer.java      | 47 +++++++++++
 .../iotdb/cluster/log/sequencing/LogSequencer.java | 42 ++++++++++
 .../log/sequencing/SynchronousSequencer.java       | 95 ++++++++++++++++++++++
 .../cluster/server/member/DataGroupMember.java     |  2 +
 .../cluster/server/member/MetaGroupMember.java     |  2 +
 .../iotdb/cluster/server/member/RaftMember.java    | 74 +++++++----------
 6 files changed, 218 insertions(+), 44 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
new file mode 100644
index 0000000..1e089c6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public class AsynchronousSequencer implements LogSequencer{
+
+  private RaftMember member;
+  private RaftLogManager logManager;
+
+  public AsynchronousSequencer(RaftMember member,
+      RaftLogManager logManager) {
+    this.member = member;
+    this.logManager = logManager;
+  }
+
+  @Override
+  public SendLogRequest sequence(Log log) {
+    return null;
+  }
+
+  @Override
+  public void setLogManager(RaftLogManager logManager) {
+    this.logManager = logManager;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
new file mode 100644
index 0000000..b418162
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+
+/**
+ * LogSequencer assigns a unique index and associated term to a log entry and offers the entry to a
+ * LogDispatcher which will send the entry to followers.
+ */
+public interface LogSequencer {
+
+  /**
+   * assigns a unique index and associated term to a log entry and offers the entry to a
+   * LogDispatcher which will send the entry to followers.
+   *
+   * @param log a log entry that is not yet indexed.
+   * @return A SendLogRequest through which the caller can monitor the status of the sending entry.
+   */
+  SendLogRequest sequence(Log log);
+
+  void setLogManager(RaftLogManager logManager);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
new file mode 100644
index 0000000..4327065
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
+ * thread.
+ */
+public class SynchronousSequencer implements LogSequencer {
+
+  private RaftMember member;
+  private RaftLogManager logManager;
+
+  public SynchronousSequencer(RaftMember member, RaftLogManager logManager) {
+    this.member = member;
+    this.logManager = logManager;
+  }
+
+  @Override
+  public SendLogRequest sequence(Log log) {
+    SendLogRequest sendLogRequest;
+
+    long startTime =
+        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
+    synchronized (logManager) {
+      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
+          startTime);
+
+      log.setCurrLogTerm(member.getTerm().get());
+      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+      startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+
+      // logDispatcher will serialize log, and set log size, and we will use the size after it
+      logManager.append(log);
+      Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+      startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
+      sendLogRequest = buildSendLogRequest(log);
+      Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+
+      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+      log.setCreateTime(System.nanoTime());
+      member.getLogDispatcher().offer(sendLogRequest);
+      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+    }
+    return sendLogRequest;
+  }
+
+  @Override
+  public void setLogManager(RaftLogManager logManager) {
+    this.logManager = logManager;
+  }
+
+  private SendLogRequest buildSendLogRequest(Log log) {
+    AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
+
+    long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
+    AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
+    Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
+
+    return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index f4a750b..6b66216 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
@@ -210,6 +211,7 @@ public class DataGroupMember extends RaftMember {
     logManager =
         new FilePartitionedSnapshotLogManager(
             dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
+    logSequencer = new SynchronousSequencer(this, logManager);
     initPeerMap();
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index bacacab..ef3ca8b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
 import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
@@ -265,6 +266,7 @@ public class MetaGroupMember extends RaftMember {
     // committed logs are applied to the state machine (the IoTDB instance) through the applier
     LogApplier metaLogApplier = new MetaLogApplier(this);
     logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
+    logSequencer = new SynchronousSequencer(this, logManager);
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 774e269..8cb88b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -244,6 +245,8 @@ public abstract class RaftMember {
    */
   protected PlanExecutor localExecutor;
 
+  protected LogSequencer logSequencer;
+
   protected RaftMember() {}
 
   protected RaftMember(
@@ -328,6 +331,9 @@ public abstract class RaftMember {
       this.logManager.close();
     }
     this.logManager = logManager;
+    if (logSequencer != null) {
+      logSequencer.setLogManager(logManager);
+    }
   }
 
   /**
@@ -1060,54 +1066,33 @@ public abstract class RaftMember {
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
     }
-    // assign term and index to the new log and append it
-    SendLogRequest sendLogRequest;
 
-    long startTime =
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
     Log log;
-    synchronized (logManager) {
-      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
-          startTime);
-
-      if (plan instanceof LogPlan) {
-        try {
-          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-        } catch (UnknownLogTypeException e) {
-          logger.error("Can not parse LogPlan {}", plan, e);
-          return StatusUtils.PARSE_LOG_ERROR;
-        }
-      } else {
-        log = new PhysicalPlanLog();
-        ((PhysicalPlanLog) log).setPlan(plan);
-        plan.setIndex(logManager.getLastLogIndex() + 1);
-      }
-      log.setCurrLogTerm(getTerm().get());
-      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
-      startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
-      // just like processPlanLocally,we need to check the size of log
-      if (log.serialize().capacity() + Integer.BYTES
-          >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-        logger.error(
-            "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-                + "or reduce the size of requests you send.");
-        return StatusUtils.INTERNAL_ERROR;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
       }
-      // logDispatcher will serialize log, and set log size, and we will use the size after it
-      logManager.append(log);
-      Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
-      startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
-      sendLogRequest = buildSendLogRequest(log);
-      Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+      plan.setIndex(logManager.getLastLogIndex() + 1);
+    }
 
-      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-      log.setCreateTime(System.nanoTime());
-      getLogDispatcher().offer(sendLogRequest);
-      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+    // just like processPlanLocally,we need to check the size of log
+    if (log.serialize().capacity() + Integer.BYTES
+        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
     }
 
+    // assign term and index to the new log and append it
+    SendLogRequest sendLogRequest = logSequencer.sequence(log);
+
     try {
       AppendLogResult appendLogResult =
           waitAppendResult(
@@ -1116,6 +1101,7 @@ public abstract class RaftMember {
               sendLogRequest.getNewLeaderTerm());
       Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
           sendLogRequest.getLog().getCreateTime());
+      long startTime;
       switch (appendLogResult) {
         case OK:
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
@@ -1486,7 +1472,7 @@ public abstract class RaftMember {
     return term;
   }
 
-  private synchronized LogDispatcher getLogDispatcher() {
+  public synchronized LogDispatcher getLogDispatcher() {
     if (logDispatcher == null) {
       logDispatcher = new LogDispatcher(this);
     }
@@ -1598,7 +1584,7 @@ public abstract class RaftMember {
     return tsStatus;
   }
 
-  AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+  public AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(term.get());
     if (serializeNow) {