You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/17 07:57:46 UTC
[iotdb] branch native_raft updated: add sliding window log appender
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new c561b93cf8 add sliding window log appender
c561b93cf8 is described below
commit c561b93cf84831bb49cbd55bc4d8e9c569e9685f
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Feb 17 15:59:04 2023 +0800
add sliding window log appender
---
.../log/appender/SlidingWindowLogAppender.java | 269 +++++++++++++++++++++
1 file changed, 269 insertions(+)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
new file mode 100644
index 0000000000..da366f0330
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.appender;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.consensus.natraft.Utils.Response;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlidingWindowLogAppender implements LogAppender {
+
+ private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
+
+ private int windowCapacity;
+ private int windowLength = 0;
+ private Entry[] logWindow;
+ private long firstPosPrevIndex;
+ private long[] prevTerms;
+
+ private RaftMember member;
+ private RaftLogManager logManager;
+ private RaftConfig config;
+
+ public SlidingWindowLogAppender(RaftMember member, RaftConfig config) {
+ this.member = member;
+ this.logManager = member.getLogManager();
+ windowCapacity = config.getMaxNumOfLogsInMem();
+ logWindow = new Entry[windowCapacity];
+ prevTerms = new long[windowCapacity];
+ this.config = config;
+ reset();
+ }
+
+ /**
+ * After insert an entry into the window, check if its previous and latter entries should be
+ * removed if it mismatches.
+ *
+ * @param pos
+ */
+ private void checkLog(int pos) {
+ checkLogPrev(pos);
+ checkLogNext(pos);
+ }
+
+ private void checkLogPrev(int pos) {
+ // check the previous entry
+ long prevLogTerm = prevTerms[pos];
+ if (pos > 0) {
+ Entry prev = logWindow[pos - 1];
+ if (prev != null && prev.getCurrLogTerm() != prevLogTerm) {
+ logWindow[pos - 1] = null;
+ }
+ }
+ }
+
+ private void checkLogNext(int pos) {
+ // check the next entry
+ Entry entry = logWindow[pos];
+ boolean nextMismatch = false;
+ if (pos < windowCapacity - 1) {
+ long nextPrevTerm = prevTerms[pos + 1];
+ if (nextPrevTerm != entry.getCurrLogTerm()) {
+ nextMismatch = true;
+ }
+ }
+ if (nextMismatch) {
+ for (int i = pos + 1; i < windowCapacity; i++) {
+ if (logWindow[i] != null) {
+ logWindow[i] = null;
+ if (i == windowLength - 1) {
+ windowLength = pos + 1;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Flush window range [0, flushPos) into the LogManager, where flushPos is the first null position
+ * in the window.
+ *
+ * @param result
+ * @param leaderCommit
+ * @return
+ */
+ private long flushWindow(AppendEntryResult result, long leaderCommit) {
+ long windowPrevLogIndex = firstPosPrevIndex;
+ long windowPrevLogTerm = prevTerms[0];
+
+ int flushPos = 0;
+ for (; flushPos < windowCapacity; flushPos++) {
+ if (logWindow[flushPos] == null) {
+ break;
+ }
+ }
+
+ // flush [0, flushPos)
+ List<Entry> logs = Arrays.asList(logWindow).subList(0, flushPos);
+ logger.debug(
+ "Flushing {} entries to log, first {}, last {}",
+ logs.size(),
+ logs.get(0),
+ logs.get(logs.size() - 1));
+
+ long startWaitingTime = System.currentTimeMillis();
+ long success;
+ while (true) {
+ // TODO: Consider memory footprint to execute a precise rejection
+ if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
+ <= config
+ .getUnAppliedRaftLogNumForRejectThreshold()) {
+ synchronized (logManager) {
+ success =
+ logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
+ break;
+ }
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(
+ config.getCheckPeriodWhenInsertBlocked());
+ if (System.currentTimeMillis() - startWaitingTime
+ > config.getMaxWaitingTimeWhenInsertBlocked()) {
+ result.status = Response.RESPONSE_TOO_BUSY;
+ return -1;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (success != -1) {
+ moveWindowRightward(flushPos);
+ }
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(firstPosPrevIndex);
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ return success;
+ }
+
+ private void moveWindowRightward(int step) {
+ System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
+ System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
+ for (int i = 1; i <= step; i++) {
+ logWindow[windowCapacity - i] = null;
+ }
+ firstPosPrevIndex = logManager.getLastLogIndex();
+ }
+
+ private void moveWindowLeftward(int step) {
+ int length = Math.max(windowCapacity - step, 0);
+ System.arraycopy(logWindow, 0, logWindow, step, length);
+ System.arraycopy(prevTerms, 0, prevTerms, step, length);
+ for (int i = 0; i < length; i++) {
+ logWindow[i] = null;
+ }
+ firstPosPrevIndex = logManager.getLastLogIndex();
+ }
+
+ @Override
+ public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> entries) {
+ if (entries.isEmpty()) {
+ return new AppendEntryResult(Response.RESPONSE_AGREE)
+ .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+ }
+
+ AppendEntryResult result = null;
+ for (Entry entry : entries) {
+ result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, entry);
+
+ if (result.status != Response.RESPONSE_AGREE
+ && result.status != Response.RESPONSE_STRONG_ACCEPT
+ && result.status != Response.RESPONSE_WEAK_ACCEPT) {
+ return result;
+ }
+ request.prevLogIndex = entry.getCurrLogIndex();
+ request.prevLogTerm = entry.getCurrLogTerm();
+ }
+
+ return result;
+ }
+
+
+ private AppendEntryResult appendEntry(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, Entry entry) {
+ long appendedPos = 0;
+
+ AppendEntryResult result = new AppendEntryResult();
+ synchronized (logManager) {
+ int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
+ if (windowPos < 0) {
+ // the new entry may replace an appended entry
+ appendedPos = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit,
+ Collections.singletonList(entry));
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ moveWindowLeftward(-windowPos);
+ } else if (windowPos < windowCapacity) {
+ // the new entry falls into the window
+ logWindow[windowPos] = entry;
+ prevTerms[windowPos] = prevLogTerm;
+ if (windowLength < windowPos + 1) {
+ windowLength = windowPos + 1;
+ }
+ checkLog(windowPos);
+ if (windowPos == 0) {
+ appendedPos = flushWindow(result, leaderCommit);
+ } else {
+ result.status = Response.RESPONSE_WEAK_ACCEPT;
+ }
+
+ } else {
+ result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
+ result.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+ return result;
+ }
+ }
+
+ if (appendedPos == -1) {
+ // the incoming log points to an illegal position, reject it
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ }
+ return result;
+ }
+
+ @Override
+ public void reset() {
+ this.firstPosPrevIndex = logManager.getLastLogIndex();
+ this.prevTerms[0] = logManager.getLastLogTerm();
+ logWindow = new Entry[windowCapacity];
+ prevTerms = new long[windowCapacity];
+ windowLength = 0;
+ }
+
+ public static class Factory implements LogAppenderFactory {
+
+ @Override
+ public LogAppender create(RaftMember member, RaftConfig config) {
+ return new SlidingWindowLogAppender(member, config);
+ }
+ }
+}