You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:02 UTC
[iotdb] 02/09: add interface
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4edae81a88055677686fcc5a84774685648453ac
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 14:54:49 2021 +0800
add interface
---
.../log/sequencing/AsynchronousSequencer.java | 47 +++++++++++
.../iotdb/cluster/log/sequencing/LogSequencer.java | 42 ++++++++++
.../log/sequencing/SynchronousSequencer.java | 95 ++++++++++++++++++++++
.../cluster/server/member/DataGroupMember.java | 2 +
.../cluster/server/member/MetaGroupMember.java | 2 +
.../iotdb/cluster/server/member/RaftMember.java | 74 +++++++----------
6 files changed, 218 insertions(+), 44 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
new file mode 100644
index 0000000..1e089c6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public class AsynchronousSequencer implements LogSequencer{
+
+ private RaftMember member;
+ private RaftLogManager logManager;
+
+ public AsynchronousSequencer(RaftMember member,
+ RaftLogManager logManager) {
+ this.member = member;
+ this.logManager = logManager;
+ }
+
+ @Override
+ public SendLogRequest sequence(Log log) {
+ return null;
+ }
+
+ @Override
+ public void setLogManager(RaftLogManager logManager) {
+ this.logManager = logManager;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
new file mode 100644
index 0000000..b418162
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+
+/**
+ * LogSequencer assigns a unique index and associated term to a log entry and offers the entry to a
+ * LogDispatcher which will send the entry to followers.
+ */
+public interface LogSequencer {
+
+ /**
+ * assigns a unique index and associated term to a log entry and offers the entry to a
+ * LogDispatcher which will send the entry to followers.
+ *
+ * @param log a log entry that is not yet indexed.
+ * @return A SendLogRequest through which the caller can monitor the status of the sending entry.
+ */
+ SendLogRequest sequence(Log log);
+
+ void setLogManager(RaftLogManager logManager);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
new file mode 100644
index 0000000..4327065
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
+ * thread.
+ */
+public class SynchronousSequencer implements LogSequencer {
+
+ private RaftMember member;
+ private RaftLogManager logManager;
+
+ public SynchronousSequencer(RaftMember member, RaftLogManager logManager) {
+ this.member = member;
+ this.logManager = logManager;
+ }
+
+ @Override
+ public SendLogRequest sequence(Log log) {
+ SendLogRequest sendLogRequest;
+
+ long startTime =
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
+ synchronized (logManager) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
+ startTime);
+
+ log.setCurrLogTerm(member.getTerm().get());
+ log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+ startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+
+ // logDispatcher will serialize log, and set log size, and we will use the size after it
+ logManager.append(log);
+ Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+ startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
+ sendLogRequest = buildSendLogRequest(log);
+ Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ member.getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ }
+ return sendLogRequest;
+ }
+
+ @Override
+ public void setLogManager(RaftLogManager logManager) {
+ this.logManager = logManager;
+ }
+
+ private SendLogRequest buildSendLogRequest(Log log) {
+ AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+ AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+ AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
+
+ long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
+ AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
+ Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
+
+ return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index f4a750b..6b66216 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
@@ -210,6 +211,7 @@ public class DataGroupMember extends RaftMember {
logManager =
new FilePartitionedSnapshotLogManager(
dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
+ logSequencer = new SynchronousSequencer(this, logManager);
initPeerMap();
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index bacacab..ef3ca8b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
@@ -265,6 +266,7 @@ public class MetaGroupMember extends RaftMember {
// committed logs are applied to the state machine (the IoTDB instance) through the applier
LogApplier metaLogApplier = new MetaLogApplier(this);
logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
+ logSequencer = new SynchronousSequencer(this, logManager);
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 774e269..8cb88b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -244,6 +245,8 @@ public abstract class RaftMember {
*/
protected PlanExecutor localExecutor;
+ protected LogSequencer logSequencer;
+
protected RaftMember() {}
protected RaftMember(
@@ -328,6 +331,9 @@ public abstract class RaftMember {
this.logManager.close();
}
this.logManager = logManager;
+ if (logSequencer != null) {
+ logSequencer.setLogManager(logManager);
+ }
}
/**
@@ -1060,54 +1066,33 @@ public abstract class RaftMember {
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
}
- // assign term and index to the new log and append it
- SendLogRequest sendLogRequest;
- long startTime =
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
Log log;
- synchronized (logManager) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
- startTime);
-
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
- plan.setIndex(logManager.getLastLogIndex() + 1);
- }
- log.setCurrLogTerm(getTerm().get());
- log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
- startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
- // just like processPlanLocally,we need to check the size of log
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
}
- // logDispatcher will serialize log, and set log size, and we will use the size after it
- logManager.append(log);
- Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
- sendLogRequest = buildSendLogRequest(log);
- Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ plan.setIndex(logManager.getLastLogIndex() + 1);
+ }
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- getLogDispatcher().offer(sendLogRequest);
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ // just like processPlanLocally,we need to check the size of log
+ if (log.serialize().capacity() + Integer.BYTES
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
}
+ // assign term and index to the new log and append it
+ SendLogRequest sendLogRequest = logSequencer.sequence(log);
+
try {
AppendLogResult appendLogResult =
waitAppendResult(
@@ -1116,6 +1101,7 @@ public abstract class RaftMember {
sendLogRequest.getNewLeaderTerm());
Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
sendLogRequest.getLog().getCreateTime());
+ long startTime;
switch (appendLogResult) {
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
@@ -1486,7 +1472,7 @@ public abstract class RaftMember {
return term;
}
- private synchronized LogDispatcher getLogDispatcher() {
+ public synchronized LogDispatcher getLogDispatcher() {
if (logDispatcher == null) {
logDispatcher = new LogDispatcher(this);
}
@@ -1598,7 +1584,7 @@ public abstract class RaftMember {
return tsStatus;
}
- AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+ public AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(term.get());
if (serializeNow) {