You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2023/08/07 15:51:19 UTC
[ignite-3] branch main updated: IGNITE-19960 Sync with JRaft repo - Fixes #2321.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 606f17fc3c IGNITE-19960 Sync with JRaft repo - Fixes #2321.
606f17fc3c is described below
commit 606f17fc3c0ffa71c4e2caed9bd228735516b404
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Mon Aug 7 18:50:54 2023 +0300
IGNITE-19960 Sync with JRaft repo - Fixes #2321.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 28 +++---
.../ignite/raft/jraft/core/FSMCallerImpl.java | 6 +-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 91 +++++++++++-------
.../raft/jraft/core/ReadOnlyServiceImpl.java | 53 +++++++----
.../apache/ignite/raft/jraft/core/Replicator.java | 19 +++-
.../apache/ignite/raft/jraft/entity/LogEntry.java | 25 +++++
.../ignite/raft/jraft/entity/ReadIndexStatus.java | 7 ++
.../OverloadException.java} | 41 ++++----
.../ignite/raft/jraft/option/ApplyTaskMode.java | 28 ++++++
.../ignite/raft/jraft/option/NodeOptions.java | 71 ++++++++------
.../ignite/raft/jraft/option/RaftOptions.java | 19 ++++
.../apache/ignite/raft/jraft/rpc/RpcRequests.java | 2 +-
.../raft/jraft/rpc/impl/AbstractClientService.java | 2 +-
.../ignite/raft/jraft/storage/LogManager.java | 8 ++
.../raft/jraft/storage/impl/LogManagerImpl.java | 106 +++++++++++----------
.../raft/jraft/storage/impl/RocksDBLogStorage.java | 34 ++++---
.../storage/snapshot/SnapshotExecutorImpl.java | 2 +-
.../snapshot/local/LocalSnapshotStorage.java | 2 +-
.../org/apache/ignite/raft/jraft/util/Utils.java | 11 +++
.../raft/jraft/core/ReadOnlyServiceTest.java | 48 ++++++++++
.../ignite/raft/jraft/entity/LogEntryTest.java | 36 +++++++
.../raft/jraft/storage/impl/LogManagerTest.java | 7 ++
.../ignite/raft/jraft/core/ExpectClosure.java | 29 ++++--
.../ignite/raft/jraft/core/MockStateMachine.java | 4 +-
24 files changed, 472 insertions(+), 207 deletions(-)
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 72e5dfa847..8d7cbc77a0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -1478,7 +1478,7 @@ public class ItNodeTest {
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
assertFalse(followers.isEmpty());
- sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.
+ int success = sendTestTaskAndWait("follower apply ", followers.get(0), 10, -1); // Should fail, because no leader.
stopBlockingMessagesOnFollowers(followers);
@@ -1517,7 +1517,7 @@ public class ItNodeTest {
cluster.ensureSame();
for (MockStateMachine fsm : cluster.getFsms())
- assertEquals(30, fsm.getLogs().size());
+ assertEquals(30 + success, fsm.getLogs().size());
}
@Test
@@ -3178,7 +3178,7 @@ public class ItNodeTest {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
peers.add(peer0);
- cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
+ cluster = new TestCluster("testChangePeersAddMultiNodes", dataPath, Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
@@ -3345,7 +3345,7 @@ public class ItNodeTest {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+ cluster = new TestCluster("testChangePeersChaosWithSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 2));
// start other peers
for (int i = 1; i < 10; i++) {
@@ -3390,7 +3390,7 @@ public class ItNodeTest {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+ cluster = new TestCluster("testChangePeersChaosWithoutSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
@@ -3427,8 +3427,8 @@ public class ItNodeTest {
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms()) {
- assertTrue(fsm.getLogs().size() >= tasks);
- assertTrue(fsm.getLogs().size() - tasks < 100);
+ final int logSize = fsm.getLogs().size();
+ assertTrue(logSize >= tasks, "logSize=" + logSize);
}
}
@@ -3437,7 +3437,7 @@ public class ItNodeTest {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+ cluster = new TestCluster("testChangePeersChaosApplyTasks", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
@@ -3503,7 +3503,6 @@ public class ItNodeTest {
for (MockStateMachine fsm : cluster.getFsms()) {
int logSize = fsm.getLogs().size();
assertTrue(logSize >= 5000 * threads, "logSize= " + logSize);
- assertTrue(logSize - 5000 * threads < 100, "logSize= " + logSize);
}
}
@@ -3904,20 +3903,17 @@ public class ItNodeTest {
}
@SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException {
- sendTestTaskAndWait(prefix, node, 10, code);
- }
-
- @SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(String prefix, Node node, int amount,
+ private int sendTestTaskAndWait(String prefix, Node node, int amount,
int code) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
+ final AtomicInteger successCount = new AtomicInteger(0);
for (int i = 0; i < amount; i++) {
ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes(UTF_8));
- Task task = new Task(data, new ExpectClosure(code, null, latch));
+ Task task = new Task(data, new ExpectClosure(code, null, latch, successCount));
node.apply(task);
}
waitLatch(latch);
+ return successCount.get();
}
private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 9456e17565..4d4b89f5c7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -230,11 +230,7 @@ public class FSMCallerImpl implements FSMCaller {
return false;
}
- if (!this.taskQueue.tryPublishEvent(tpl)) {
- setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
- "FSMCaller is overload.")));
- return false;
- }
+ this.taskQueue.publishEvent(tpl);
return true;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 15cc2c0adb..1d83ea60e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -78,6 +79,7 @@ import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
import org.apache.ignite.raft.jraft.error.LogNotFoundException;
+import org.apache.ignite.raft.jraft.error.OverloadException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.BallotBoxOptions;
@@ -125,7 +127,6 @@ import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
-import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.ThreadId;
import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -140,9 +141,6 @@ public class NodeImpl implements Node, RaftServerService {
public static final Status LEADER_STEPPED_DOWN = new Status(RaftError.EPERM, "Leader stepped down.");
- // Max retry times when applying tasks.
- private static final int MAX_APPLY_RETRY_TIMES = 3;
-
private volatile HybridClock clock;
/**
@@ -1602,7 +1600,8 @@ public class NodeImpl implements Node, RaftServerService {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
- final List<Closure> dones = tasks.stream().map(ele -> ele.done).collect(Collectors.toList());
+ final List<Closure> dones = tasks.stream().map(ele -> ele.done)
+ .filter(Objects::nonNull).collect(Collectors.toList());
Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
for (final Closure done : dones) {
done.run(st);
@@ -1857,36 +1856,30 @@ public class NodeImpl implements Node, RaftServerService {
final LogEntry entry = new LogEntry();
entry.setData(task.getData());
- int retryTimes = 0;
- try {
- final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
- event.reset();
- event.nodeId = getNodeId();
- event.done = task.getDone();
- event.entry = entry;
- event.expectedTerm = task.getExpectedTerm();
- };
- while (true) {
- if (this.applyQueue.tryPublishEvent(translator)) {
- break;
- }
- else {
- retryTimes++;
- if (retryTimes > MAX_APPLY_RETRY_TIMES) {
- Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(),
- new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
- LOG.warn("Node {} applyQueue is overload.", getNodeId());
- this.metrics.recordTimes("apply-task-overload-times", 1);
- return;
+
+ final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
+ event.reset();
+ event.nodeId = getNodeId();
+ event.done = task.getDone();
+ event.entry = entry;
+ event.expectedTerm = task.getExpectedTerm();
+ };
+ switch (this.options.getApplyTaskMode()) {
+ case Blocking:
+ this.applyQueue.publishEvent(translator);
+ break;
+ case NonBlocking:
+ default:
+ if (!this.applyQueue.tryPublishEvent(translator)) {
+ String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
+ Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EBUSY, errorMsg));
+ LOG.warn("Node {} applyQueue is overload.", getNodeId());
+ this.metrics.recordTimes("apply-task-overload-times", 1);
+ if (task.getDone() == null) {
+ throw new OverloadException(errorMsg);
}
- ThreadHelper.onSpinWait();
- }
}
-
- }
- catch (final Exception e) {
- LOG.error("Fail to apply task.", e);
- Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EPERM, "Node is down."));
+ break;
}
}
@@ -2153,6 +2146,7 @@ public class NodeImpl implements Node, RaftServerService {
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
final int entriesCount = Utils.size(request.entriesList());
+ boolean success = false;
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
@@ -2257,6 +2251,24 @@ public class NodeImpl implements Node, RaftServerService {
return respBuilder.build();
}
+ // fast checking if log manager is overloaded
+ if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
+ LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", getNodeId());
+
+ AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
+ .appendEntriesResponse()
+ .success(false)
+ .errorCode(RaftError.EBUSY.getNumber())
+ .errorMsg(String.format("Node %s:%s log manager is busy.", this.groupId, this.serverId))
+ .term(this.currTerm);
+
+ if (request.timestamp() != null) {
+ rb.timestampLong(clock.update(request.timestamp()).longValue());
+ }
+
+ return rb.build();
+ }
+
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
@@ -2296,14 +2308,23 @@ public class NodeImpl implements Node, RaftServerService {
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
checkAndSetConfiguration(true);
+ success = true;
return null;
}
finally {
if (doUnlock) {
this.writeLock.unlock();
}
- this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
- this.metrics.recordSize("handle-append-entries-count", entriesCount);
+ final long processLatency = Utils.monotonicMs() - startMs;
+ if (entriesCount == 0) {
+ this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
+ } else {
+ this.metrics.recordLatency("handle-append-entries", processLatency);
+ }
+ if (success) {
+ // Don't stats heartbeat requests.
+ this.metrics.recordSize("handle-append-entries-count", entriesCount);
+ }
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index e6aae9f426..f97eea2ae2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -41,6 +41,7 @@ import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.ReadIndexState;
import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
+import org.apache.ignite.raft.jraft.error.OverloadException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.RaftOptions;
@@ -53,15 +54,12 @@ import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Bytes;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
-import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.Utils;
/**
* Read-only service implementation.
*/
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
- private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
-
/**
* Disruptor to run readonly service.
*/
@@ -194,10 +192,19 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
notifySuccess(readIndexStatus);
}
else {
- // Not applied, add it to pending-notify cache.
- ReadOnlyServiceImpl.this.pendingNotifyStatus
- .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
- .add(readIndexStatus);
+ if (readIndexStatus.isOverMaxReadIndexLag(
+ ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex(),
+ ReadOnlyServiceImpl.this.raftOptions.getMaxReadIndexLag())
+ ) {
+ ReadOnlyServiceImpl.this.lock.unlock();
+ doUnlock = false;
+ notifyFail(new Status(-1, "Fail to run ReadIndex task, the gap of current node's apply index between leader's commit index over maxReadIndexLag"));
+ } else {
+ // Not applied, add it to pending-notify cache.
+ ReadOnlyServiceImpl.this.pendingNotifyStatus
+ .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
+ .add(readIndexStatus);
+ }
}
}
finally {
@@ -329,22 +336,23 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};
- int retryTimes = 0;
- while (true) {
- if (this.readIndexQueue.tryPublishEvent(translator)) {
+
+ switch (this.node.getOptions().getApplyTaskMode()) {
+ case Blocking:
+ this.readIndexQueue.publishEvent(translator);
break;
- }
- else {
- retryTimes++;
- if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
- Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure,
- new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
+ case NonBlocking:
+ default:
+ if (!this.readIndexQueue.tryPublishEvent(translator)) {
+ String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize="+ this.readIndexQueue.getBufferSize();
+ Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure, new Status(RaftError.EBUSY, errorMsg));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
- return;
- }
- ThreadHelper.onSpinWait();
- }
+ if(closure == null) {
+ throw new OverloadException(errorMsg);
+ }
+ }
+ break;
}
}
catch (final Exception e) {
@@ -419,6 +427,11 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
return this.pendingNotifyStatus;
}
+ @OnlyForTest
+ RaftOptions getRaftOptions() {
+ return this.raftOptions;
+ }
+
private void reportError(final ReadIndexStatus status, final Status st) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 9e631d8678..218f79e038 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -56,6 +56,7 @@ import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
@@ -596,8 +597,8 @@ public class Replicator implements ThreadId.OnError {
return;
}
boolean doUnlock = true;
- if (!rpcService.connect(options.getPeerId())) {
- LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", options.getPeerId());
+ if (!this.rpcService.connect(this.options.getPeerId())) {
+ LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", this.options.getPeerId());
block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
return;
}
@@ -1444,6 +1445,20 @@ public class Replicator implements ThreadId.OnError {
}
r.consecutiveErrorTimes = 0;
if (!response.success()) {
+ // Target node is is busy, sleep for a while.
+ if (response.errorCode() == RaftError.EBUSY.getNumber()) {
+ if (isLogDebugEnabled) {
+ sb.append(" is busy, sleep, errorMsg='") //
+ .append(response.errorMsg()).append("'");
+ LOG.debug(sb.toString());
+ }
+ r.resetInflights();
+ r.setState(State.Probe);
+ // unlock in in block
+ r.block(startTimeMs, status.getCode());
+ return false;
+ }
+
if (response.term() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ") //
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
index 8022e24bd1..c4826b3a64 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
@@ -152,10 +152,35 @@ public class LogEntry implements Checksum {
this.oldPeers = oldPeers;
}
+ /**
+ * Returns the log data, it's not read-only, you SHOULD take care it's modification and
+ * thread-safety by yourself.
+ *
+ * @return the log data
+ */
public ByteBuffer getData() {
return this.data;
}
+ /**
+ * Creates a new byte buffer whose content is a shared subsequence of this log entry's data
+ * buffer's content.
+ *
+ * @return The new byte buffer
+ */
+ public ByteBuffer sliceData() {
+ return this.data != null ? this.data.slice() : null;
+ }
+
+ /**
+ * Creates a new, read-only byte buffer that shares this log entry's data buffer's content.
+ *
+ * @return the new, read-only byte buffer
+ */
+ public ByteBuffer getReadOnlyData() {
+ return this.data != null ? this.data.asReadOnlyBuffer() : null;
+ }
+
public void setData(final ByteBuffer data) {
this.data = data;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
index ae7de63c54..23f772175c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
@@ -39,6 +39,13 @@ public class ReadIndexStatus {
return appliedIndex >= this.index;
}
+ public boolean isOverMaxReadIndexLag(long applyIndex, int maxReadIndexLag) {
+ if (maxReadIndexLag < 0) {
+ return false;
+ }
+ return this.index - applyIndex > maxReadIndexLag;
+ }
+
public long getIndex() {
return index;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
similarity index 50%
copy from modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
index ae7de63c54..29efb4eb7c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
@@ -14,41 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.entity;
-
-import java.util.List;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
+package org.apache.ignite.raft.jraft.error;
/**
- * ReadIndex requests statuses.
+ * Threw when Node is overloaded.
*/
-public class ReadIndexStatus {
+public class OverloadException extends JRaftException {
- private final ReadIndexRequest request; // raw request
- private final List<ReadIndexState> states; // read index requests in batch.
- private final long index; // committed log index.
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5505054326197103575L;
- public ReadIndexStatus(List<ReadIndexState> states, ReadIndexRequest request, long index) {
+ public OverloadException() {
super();
- this.index = index;
- this.request = request;
- this.states = states;
}
- public boolean isApplied(long appliedIndex) {
- return appliedIndex >= this.index;
+ public OverloadException(final String message, final Throwable cause, final boolean enableSuppression,
+ final boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
}
- public long getIndex() {
- return index;
+ public OverloadException(final String message, final Throwable cause) {
+ super(message, cause);
}
- public ReadIndexRequest getRequest() {
- return request;
+ public OverloadException(final String message) {
+ super(message);
}
- public List<ReadIndexState> getStates() {
- return states;
+ public OverloadException(final Throwable cause) {
+ super(cause);
}
-
-}
\ No newline at end of file
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java
new file mode 100644
index 0000000000..9925d2bc2b
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.option;
+
+/**
+ * Apply task in blocking or non-blocking mode.
+ *
+ */
+public enum ApplyTaskMode {
+ // It is strongly not recommended to use blocking mode because it is forbidden to use blocking code in Ignite worker threads.
+ // This mode is not deleted mostly because of the desire to not differ with the original JRaft.
+ Blocking,
+ NonBlocking
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 678dc61766..b674b9b16d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -253,6 +253,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
/** */
private List<Stripe> logStripes;
+ /**
+ * Apply task in blocking or non-blocking mode, ApplyTaskMode.NonBlocking by default.
+ */
+ private ApplyTaskMode applyTaskMode = ApplyTaskMode.NonBlocking;
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
}
@@ -279,7 +284,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
* @return {@code true} if shared pools mode is in use.
*/
public boolean isSharedPools() {
- return sharedPools;
+ return this.sharedPools;
}
/**
@@ -289,6 +294,14 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
this.sharedPools = sharedPools;
}
+ public ApplyTaskMode getApplyTaskMode() {
+ return this.applyTaskMode;
+ }
+
+ public void setApplyTaskMode(final ApplyTaskMode applyTaskMode) {
+ this.applyTaskMode = applyTaskMode;
+ }
+
/**
* Service factory.
*/
@@ -374,7 +387,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public int getElectionPriority() {
- return electionPriority;
+ return this.electionPriority;
}
public void setElectionPriority(int electionPriority) {
@@ -382,7 +395,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public int getDecayPriorityGap() {
- return decayPriorityGap;
+ return this.decayPriorityGap;
}
public void setDecayPriorityGap(int decayPriorityGap) {
@@ -423,7 +436,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public int getSnapshotLogIndexMargin() {
- return snapshotLogIndexMargin;
+ return this.snapshotLogIndexMargin;
}
public void setSnapshotLogIndexMargin(int snapshotLogIndexMargin) {
@@ -507,11 +520,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public ExecutorService getCommonExecutor() {
- return commonExecutor;
+ return this.commonExecutor;
}
public FixedThreadsExecutorGroup getStripedExecutor() {
- return stripedExecutor;
+ return this.stripedExecutor;
}
public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
@@ -519,7 +532,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public Scheduler getScheduler() {
- return scheduler;
+ return this.scheduler;
}
public void setScheduler(Scheduler scheduler) {
@@ -527,7 +540,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public Timer getElectionTimer() {
- return electionTimer;
+ return this.electionTimer;
}
public void setElectionTimer(Timer electionTimer) {
@@ -535,7 +548,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public Timer getVoteTimer() {
- return voteTimer;
+ return this.voteTimer;
}
public void setVoteTimer(Timer voteTimer) {
@@ -543,7 +556,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public Timer getSnapshotTimer() {
- return snapshotTimer;
+ return this.snapshotTimer;
}
public void setSnapshotTimer(Timer snapshotTimer) {
@@ -551,7 +564,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public Timer getStepDownTimer() {
- return stepDownTimer;
+ return this.stepDownTimer;
}
public void setStepDownTimer(Timer stepDownTimer) {
@@ -559,7 +572,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public String getServerName() {
- return serverName;
+ return this.serverName;
}
public void setServerName(String serverName) {
@@ -567,7 +580,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
- return fSMCallerExecutorDisruptor;
+ return this.fSMCallerExecutorDisruptor;
}
public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
@@ -575,7 +588,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
- return nodeApplyDisruptor;
+ return this.nodeApplyDisruptor;
}
public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
@@ -583,7 +596,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
- return readOnlyServiceDisruptor;
+ return this.readOnlyServiceDisruptor;
}
public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
@@ -591,7 +604,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
- return logManagerDisruptor;
+ return this.logManagerDisruptor;
}
public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
@@ -603,11 +616,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
public List<Stripe> getLogStripes() {
- return logStripes;
+ return this.logStripes;
}
public HybridClock getClock() {
- return clock;
+ return this.clock;
}
public void setClock(HybridClock clock) {
@@ -656,18 +669,18 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
return nodeOptions;
}
- @Override
public String toString() {
- return "NodeOptions{" + "electionTimeoutMs=" + electionTimeoutMs + ", electionPriority=" + electionPriority
- + ", decayPriorityGap=" + decayPriorityGap + ", leaderLeaseTimeRatio=" + leaderLeaseTimeRatio
- + ", snapshotIntervalSecs=" + snapshotIntervalSecs + ", snapshotLogIndexMargin="
- + snapshotLogIndexMargin + ", catchupMargin=" + catchupMargin + ", initialConf=" + initialConf
- + ", fsm=" + fsm + ", logUri='" + logUri + '\'' + ", raftMetaUri='" + raftMetaUri + '\''
- + ", snapshotUri='" + snapshotUri + '\'' + ", filterBeforeCopyRemote=" + filterBeforeCopyRemote
- + ", disableCli=" + disableCli + ", timerPoolSize="
- + timerPoolSize + ", cliRpcThreadPoolSize=" + cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
- + raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle
- + ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString();
+ return "NodeOptions{" + "electionTimeoutMs=" + this.electionTimeoutMs + ", electionPriority="
+ + this.electionPriority + ", decayPriorityGap=" + this.decayPriorityGap + ", leaderLeaseTimeRatio="
+ + this.leaderLeaseTimeRatio + ", snapshotIntervalSecs=" + this.snapshotIntervalSecs
+ + ", snapshotLogIndexMargin=" + this.snapshotLogIndexMargin + ", catchupMargin=" + this.catchupMargin
+ + ", initialConf=" + this.initialConf + ", fsm=" + this.fsm + ", logUri='" + this.logUri + '\''
+ + ", raftMetaUri='" + this.raftMetaUri + '\'' + ", snapshotUri='" + this.snapshotUri + '\''
+ + ", filterBeforeCopyRemote=" + this.filterBeforeCopyRemote + ", disableCli=" + this.disableCli + ", timerPoolSize="
+ + this.timerPoolSize + ", cliRpcThreadPoolSize=" + this.cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
+ + this.raftRpcThreadPoolSize + ", enableMetrics=" + this.enableMetrics + ", snapshotThrottle="
+ + this.snapshotThrottle + ", serviceFactory=" + this.serviceFactory + ", applyTaskMode="
+ + this.applyTaskMode + ", raftOptions=" + this.raftOptions + "} " + super.toString();
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index ee92c0de16..979acc04c9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -122,6 +122,17 @@ public class RaftOptions implements Copiable<RaftOptions> {
*/
private ReadOnlyOption readOnlyOptions = ReadOnlyOption.ReadOnlySafe;
+ /**
+ * Read index read need compare current node's apply index with leader's commit index.
+ * Only current node's apply index catch up leader's commit index, then call back success to read index closure.
+ * Therefore, there is a waiting time. The default wait timeout is 2s. It means that the waiting time
+ * over 2s, then call back failure to read index closure. If current node occur problem, it's apply index maybe
+ * behind leader's commit index. In read index timeout, it can't catch up, the timeout is waste.
+ * Here supply a config to fix it. If the gap greater than maxReadIndexLag, fail fast to call back failure
+ * read index closure.
+ */
+ private int maxReadIndexLag = -1;
+
/**
* Candidate steps down when election reaching timeout, default is true(enabled).
*/
@@ -159,6 +170,14 @@ public class RaftOptions implements Copiable<RaftOptions> {
this.readOnlyOptions = readOnlyOptions;
}
+ public int getMaxReadIndexLag() {
+ return maxReadIndexLag;
+ }
+
+ public void setMaxReadIndexLag(int maxReadIndexLag) {
+ this.maxReadIndexLag = maxReadIndexLag;
+ }
+
public boolean isReplicatorPipeline() {
return this.replicatorPipeline;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 2bde5df2bf..d9a702eddd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -189,7 +189,7 @@ public final class RpcRequests {
}
@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_RESPONSE)
- public interface AppendEntriesResponse extends Message {
+ public interface AppendEntriesResponse extends ErrorResponse {
long term();
boolean success();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index 6e0ea8cef0..a2afe58770 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -94,7 +94,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
configRpcClient(this.rpcClient);
// TODO asch should the client be created lazily? A client doesn't make sence without a server IGNITE-14832
- this.rpcClient.init(null);
+ this.rpcClient.init(this.rpcOptions);
this.rpcExecutor = rpcOptions.getClientExecutor();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
index 1d4909a55b..8e2e908ecd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
@@ -105,6 +105,14 @@ public interface LogManager extends Lifecycle<LogManagerOptions>, Describer {
*/
void join() throws InterruptedException;
+ /**
+ * Given specified <tt>requiredCapacity</tt> determines if that amount of space
+ * is available to append these entries. Returns true when available.
+ * @param requiredCapacity
+ * @return Returns true when available.
+ */
+ boolean hasAvailableCapacityToAppendEntries(final int requiredCapacity);
+
/**
* Append log entry vector and wait until it's stable (NOT COMMITTED!)
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index a6363108bb..82198b7c93 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -65,8 +65,6 @@ import org.apache.ignite.raft.jraft.util.Utils;
* LogManager implementation.
*/
public class LogManagerImpl implements LogManager {
- private static final int APPEND_LOG_RETRY_TIMES = 50;
-
private static final IgniteLogger LOG = Loggers.forClass(LogManagerImpl.class);
/** Raft node id. */
@@ -206,6 +204,14 @@ public class LogManagerImpl implements LogManager {
return true;
}
+ @Override
+ public boolean hasAvailableCapacityToAppendEntries(final int requiredCapacity) {
+ if (this.stopped) {
+ return false;
+ }
+ return this.diskQueue.hasAvailableCapacity(requiredCapacity);
+ }
+
private void stopDiskThread() {
if (this.diskQueue == null)
return; // Was not started.
@@ -284,6 +290,8 @@ public class LogManagerImpl implements LogManager {
@Override
public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
+ assert(done != null);
+
Requires.requireNonNull(done, "done");
if (this.hasError) {
entries.clear();
@@ -293,7 +301,7 @@ public class LogManagerImpl implements LogManager {
boolean doUnlock = true;
this.writeLock.lock();
try {
- if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
+ if (!entries.isEmpty() && !checkAndResolveConflict(entries, done, this.writeLock)) {
// If checkAndResolveConflict returns false, the done will be called in it.
entries.clear();
return;
@@ -320,30 +328,18 @@ public class LogManagerImpl implements LogManager {
}
done.setEntries(entries);
- int retryTimes = 0;
- final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
- event.reset();
- event.nodeId = this.nodeId;
- event.type = EventType.OTHER;
- event.done = done;
- };
- while (true) {
- if (tryOfferEvent(done, translator)) {
- break;
- }
- else {
- retryTimes++;
- if (retryTimes > APPEND_LOG_RETRY_TIMES) {
- reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
- return;
- }
- ThreadHelper.onSpinWait();
- }
- }
doUnlock = false;
if (!wakeupAllWaiter(this.writeLock)) {
notifyLastLogIndexListeners();
}
+
+ // publish event out of lock
+ this.diskQueue.publishEvent((event, sequence) -> {
+ event.reset();
+ event.nodeId = this.nodeId;
+ event.type = EventType.OTHER;
+ event.done = done;
+ });
}
finally {
if (doUnlock) {
@@ -352,29 +348,24 @@ public class LogManagerImpl implements LogManager {
}
}
+ /**
+ * Adds event to disk queue, NEVER call it in lock.
+ * @param done
+ * @param type
+ */
private void offerEvent(final StableClosure done, final EventType type) {
+ assert(done != null);
+
if (this.stopped) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return;
}
- if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
+ this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
event.nodeId = this.nodeId;
event.type = type;
event.done = done;
- })) {
- reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
- Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.EBUSY, "Log manager is overload."));
- }
- }
-
- private boolean tryOfferEvent(final StableClosure done, final EventTranslator<StableClosureEvent> translator) {
- if (this.stopped) {
- Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
- return true;
- }
-
- return this.diskQueue.tryPublishEvent(translator);
+ });
}
private void notifyLastLogIndexListeners() {
@@ -622,6 +613,7 @@ public class LogManagerImpl implements LogManager {
@Override
public void setSnapshot(final SnapshotMeta meta) {
LOG.debug("set snapshot: {}.", meta);
+ boolean doUnlock = true;
this.writeLock.lock();
try {
if (meta.lastIncludedIndex() <= this.lastSnapshotId.getIndex()) {
@@ -654,7 +646,9 @@ public class LogManagerImpl implements LogManager {
if (term == 0) {
// last_included_index is larger than last_index
// FIXME: what if last_included_index is less than first_index?
- truncatePrefix(meta.lastIncludedIndex() + 1);
+ doUnlock = false;
+ //unlock in truncatePrefix
+ truncatePrefix(meta.lastIncludedIndex() + 1, this.writeLock);
}
else if (term == meta.lastIncludedTerm()) {
// Truncating log to the index of the last snapshot.
@@ -663,7 +657,9 @@ public class LogManagerImpl implements LogManager {
// followers
// TODO if there are still be need? TODO asch
if (savedLastSnapshotIndex > 0) {
- truncatePrefix(savedLastSnapshotIndex + 1);
+ doUnlock = false;
+ //unlock in truncatePrefix
+ truncatePrefix(savedLastSnapshotIndex + 1, this.writeLock);
}
}
else {
@@ -673,7 +669,9 @@ public class LogManagerImpl implements LogManager {
}
}
finally {
- this.writeLock.unlock();
+ if (doUnlock) {
+ this.writeLock.unlock();
+ }
}
}
@@ -723,14 +721,19 @@ public class LogManagerImpl implements LogManager {
@Override
public void clearBufferedLogs() {
+ boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.lastSnapshotId.getIndex() != 0) {
- truncatePrefix(this.lastSnapshotId.getIndex() + 1);
+ doUnlock = false;
+ //unlock in truncatePrefix
+ truncatePrefix(this.lastSnapshotId.getIndex() + 1, this.writeLock);
}
}
finally {
- this.writeLock.unlock();
+ if (doUnlock) {
+ this.writeLock.unlock();
+ }
}
}
@@ -869,12 +872,12 @@ public class LogManagerImpl implements LogManager {
return this.lastLogIndex;
}
c = new LastLogIdClosure();
- offerEvent(c, EventType.LAST_LOG_ID);
}
}
finally {
this.readLock.unlock();
}
+ offerEvent(c, EventType.LAST_LOG_ID);
try {
c.await();
}
@@ -920,12 +923,12 @@ public class LogManagerImpl implements LogManager {
return this.lastSnapshotId;
}
c = new LastLogIdClosure();
- offerEvent(c, EventType.LAST_LOG_ID);
}
}
finally {
this.readLock.unlock();
}
+ offerEvent(c, EventType.LAST_LOG_ID);
try {
c.await(); // TODO FIXME asch https://issues.apache.org/jira/browse/IGNITE-15974
}
@@ -982,7 +985,7 @@ public class LogManagerImpl implements LogManager {
}
}
- private boolean truncatePrefix(final long firstIndexKept) {
+ private boolean truncatePrefix(final long firstIndexKept, final Lock lock) {
this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);
@@ -997,6 +1000,7 @@ public class LogManagerImpl implements LogManager {
}
LOG.debug("Truncate prefix, firstIndexKept is :{}", firstIndexKept);
this.configManager.truncatePrefix(firstIndexKept);
+ lock.unlock();
final TruncatePrefixClosure c = new TruncatePrefixClosure(firstIndexKept);
offerEvent(c, EventType.TRUNCATE_PREFIX);
return true;
@@ -1010,16 +1014,16 @@ public class LogManagerImpl implements LogManager {
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
this.configManager.truncateSuffix(this.lastLogIndex);
- final ResetClosure c = new ResetClosure(nextLogIndex);
- offerEvent(c, EventType.RESET);
return true;
}
finally {
this.writeLock.unlock();
+ final ResetClosure c = new ResetClosure(nextLogIndex);
+ offerEvent(c, EventType.RESET);
}
}
- private void unsafeTruncateSuffix(final long lastIndexKept) {
+ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
if (lastIndexKept < this.appliedId.getIndex()) {
LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId,
lastIndexKept);
@@ -1033,12 +1037,14 @@ public class LogManagerImpl implements LogManager {
Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
LOG.debug("Truncate suffix :{}", lastIndexKept);
this.configManager.truncateSuffix(lastIndexKept);
+ lock.unlock();
final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
offerEvent(c, EventType.TRUNCATE_SUFFIX);
+ lock.lock();
}
@SuppressWarnings("NonAtomicOperationOnVolatileField")
- private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done) {
+ private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done, final Lock lock) {
final LogEntry firstLogEntry = ArrayDeque.peekFirst(entries);
if (firstLogEntry.getId().getIndex() == 0) {
// Node is currently the leader and |entries| are from the user who
@@ -1088,7 +1094,7 @@ public class LogManagerImpl implements LogManager {
if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
// Truncate all the conflicting entries to make local logs
// consensus with the leader.
- unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1);
+ unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock);
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
} // else this is a duplicated AppendEntriesRequest, we have
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
index c4dfa68762..013eb67476 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
@@ -178,7 +178,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
this.writeLock.lock();
try {
if (this.db != null) {
- LOG.warn("RocksDBLogStorage init() already.");
+ LOG.warn("RocksDBLogStorage init() in {} already.", this.path);
return true;
}
this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
@@ -291,7 +291,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
return true;
}
catch (final RocksDBException e) {
- LOG.error("Fail to save first log index {}.", e, firstLogIndex);
+ LOG.error("Fail to save first log index {} in {}.", firstLogIndex, this.path, e);
return false;
}
finally {
@@ -325,7 +325,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
private boolean executeBatch(final WriteBatchTemplate template) {
this.readLock.lock();
if (this.db == null) {
- LOG.warn("DB not initialized or destroyed.");
+ LOG.warn("DB not initialized or destroyed in data path: {}.", this.path);
this.readLock.unlock();
return false;
}
@@ -458,7 +458,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
}
}
catch (final RocksDBException | IOException e) {
- LOG.error("Fail to get log entry at index {}.", e, index);
+ LOG.error("Fail to get log entry at index {} in data path: {}.", index, this.path, e);
}
finally {
this.readLock.unlock();
@@ -509,7 +509,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
this.readLock.lock();
try {
if (this.db == null) {
- LOG.warn("DB not initialized or destroyed.");
+ LOG.warn("DB not initialized or destroyed in data path: {}.", this.path);
return false;
}
final WriteContext writeCtx = newWriteContext();
@@ -593,20 +593,28 @@ public class RocksDBLogStorage implements LogStorage, Describer {
private void truncatePrefixInBackground(final long startIndex, final long firstIndexKept) {
// delete logs in background.
Utils.runInThread(executor, () -> {
+ long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
- if (this.db == null) {
+ RocksDB db = this.db;
+ if (db == null) {
+ LOG.warn(
+ "DB is null while truncating prefixed logs in data path: {}, the range is: [{}, {})",
+ this.path, startIndex, firstIndexKept);
return;
}
onTruncatePrefix(startIndex, firstIndexKept);
- this.db.deleteRange(this.defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
- this.db.deleteRange(this.confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
- }
- catch (final RocksDBException | IOException e) {
- LOG.error("Fail to truncatePrefix {}.", e, firstIndexKept);
- }
- finally {
+ final byte[] startKey = getKeyBytes(startIndex);
+ final byte[] endKey = getKeyBytes(firstIndexKept);
+ // deleteRange to delete all keys in range.
+ db.deleteRange(this.defaultHandle, startKey, endKey);
+ db.deleteRange(this.confHandle, startKey, endKey);
+ } catch (final RocksDBException | IOException e) {
+ LOG.error("Fail to truncatePrefix in data path: {}, firstIndexKept={}.", this.path, firstIndexKept, e);
+ } finally {
this.readLock.unlock();
+ LOG.info("Truncated prefix logs in data path: {} from log index {} to {}, cost {} ms.",
+ this.path, startIndex, firstIndexKept, Utils.monotonicMs() - startMs);
}
});
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 30d966add3..b62a425cd3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -230,7 +230,7 @@ public class SnapshotExecutorImpl implements SnapshotExecutor {
}
if (snapshotStorage instanceof LocalSnapshotStorage) {
final LocalSnapshotStorage tmp = (LocalSnapshotStorage) this.snapshotStorage;
- if (tmp != null && !tmp.hasServerPeerId()) {
+ if (!tmp.hasServerPeerId()) {
tmp.setServerPeerId(opts.getPeerId());
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
index 97f08d6a46..9e0a5ed98d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
@@ -223,7 +223,7 @@ public class LocalSnapshotStorage implements SnapshotStorage {
break;
}
LOG.info("Renaming {} to {}.", tempPath, newPath);
- if (!new File(tempPath).renameTo(new File(newPath))) {
+ if (!Utils.atomicMoveFile(new File(tempPath), new File(newPath), true)) {
LOG.error("Renamed temp snapshot failed, from path {} to path {}.", tempPath, newPath);
ret = RaftError.EIO.getNumber();
ioe = new IOException("Fail to rename temp snapshot from: " + tempPath + " to: " + newPath);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 0a27cc4894..078e64252c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -117,6 +117,17 @@ public final class Utils {
reg.register(name, new ThreadPoolMetricSet(executor));
}
+ /**
+ * Run closure in current thread.
+ * @param done
+ * @param status
+ */
+ public static void runClosure(Closure done, Status status) {
+ if (done != null) {
+ done.run(status);
+ }
+ }
+
/**
* Run closure with OK status in thread pool.
*/
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index d1d64718aa..eeac58d3f5 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -296,4 +296,52 @@ public class ReadOnlyServiceTest {
latch.await();
assertTrue(this.readOnlyServiceImpl.getPendingNotifyStatus().isEmpty());
}
+
+ @Test
+ public void testOverMaxReadIndexLag() throws Exception {
+ Mockito.when(this.fsmCaller.getLastAppliedIndex()).thenReturn(1L);
+ this.readOnlyServiceImpl.getRaftOptions().setMaxReadIndexLag(50);
+
+ byte[] requestContext = TestUtils.getRandomBytes();
+ CountDownLatch latch = new CountDownLatch(1);
+ final String errMsg =
+ "Fail to run ReadIndex task, the gap of current node's apply index between leader's commit index over maxReadIndexLag";
+ this.readOnlyServiceImpl.addRequest(requestContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ assertFalse(status.isOk());
+ assertEquals(status.getErrorMsg(), errMsg);
+ assertEquals(index, -1);
+ assertArrayEquals(reqCtx, requestContext);
+ latch.countDown();
+ }
+ });
+ this.readOnlyServiceImpl.flush();
+
+ final ArgumentCaptor<RpcResponseClosure> closureCaptor = ArgumentCaptor.forClass(RpcResponseClosure.class);
+
+ Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
+
+ @Override
+ public boolean matches(ReadIndexRequest argument) {
+ if (argument != null) {
+ ReadIndexRequest req = (ReadIndexRequest) argument;
+ return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId())
+ && Utils.size(req.entriesList()) == 1
+ && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray());
+ }
+ return false;
+ }
+
+ }), closureCaptor.capture());
+
+ RpcResponseClosure closure = closureCaptor.getValue();
+
+ assertNotNull(closure);
+
+ closure.setResponse(msgFactory.readIndexResponse().index(52).success(true).build());
+ closure.run(Status.OK());
+ latch.await();
+ }
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
index 4b7b5e70e3..ca6220a0be 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.entity;
import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
@@ -27,9 +28,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class LogEntryTest {
@Test
@@ -122,4 +125,37 @@ public class LogEntryTest {
assertNotEquals(c, entry.checksum());
assertTrue(entry.isCorrupted());
}
+
+ @Test
+ public void testSliceReadOnlyData() {
+ ByteBuffer buf = ByteBuffer.wrap("hello".getBytes());
+ LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setData(buf);
+ assertSame(buf, entry.getData());
+ final ByteBuffer slice = entry.sliceData();
+ assertNotSame(buf, slice);
+ assertEquals(5, slice.remaining());
+ assertEquals("hello", new String(slice.array()));
+ slice.position(4);
+ assertEquals(4, slice.position());
+ assertEquals(0, entry.getData().position());
+ slice.put((byte) 'a');
+ assertEquals(97, slice.get(4));
+ assertEquals("hella", new String(entry.getData().array()));
+
+ ByteBuffer readOnly = entry.getReadOnlyData();
+ assertNotSame(buf, readOnly);
+ assertEquals(5, readOnly.remaining());
+ byte[] bs = new byte[5];
+ readOnly.get(bs);
+ assertEquals("hella", new String(bs));
+
+ try {
+ readOnly.position(4);
+ readOnly.put((byte) 1);
+ fail();
+ } catch (ReadOnlyBufferException e) {
+
+ }
+ }
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index bbc882da10..eabd8bbf27 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -133,6 +133,13 @@ public class LogManagerTest extends BaseStorageTest {
assertTrue(this.logManager.checkConsistency().isOk());
}
+ @Test
+ public void testHasAvailableCapacityToAppendEntries() {
+ assertTrue(this.logManager.hasAvailableCapacityToAppendEntries(1));
+ assertTrue(this.logManager.hasAvailableCapacityToAppendEntries(10));
+ assertFalse(this.logManager.hasAvailableCapacityToAppendEntries(1000000));
+ }
+
@Test
public void testAppendOneEntry() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
index e984751d19..1fdb77ff62 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.core;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -24,42 +25,52 @@ import org.apache.ignite.raft.jraft.error.RaftError;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ExpectClosure implements Closure {
- private int expectedErrCode;
- private String expectErrMsg;
- private CountDownLatch latch;
+ private final int expectedErrCode;
+ private final String expectErrMsg;
+ private final CountDownLatch latch;
+ private AtomicInteger successCount;
- public ExpectClosure(CountDownLatch latch) {
+ public ExpectClosure(final CountDownLatch latch) {
this(RaftError.SUCCESS, latch);
}
- public ExpectClosure(RaftError expectedErrCode, CountDownLatch latch) {
+ public ExpectClosure(final RaftError expectedErrCode, final CountDownLatch latch) {
this(expectedErrCode, null, latch);
}
- public ExpectClosure(RaftError expectedErrCode, String expectErrMsg, CountDownLatch latch) {
+ public ExpectClosure(final RaftError expectedErrCode, final String expectErrMsg, final CountDownLatch latch) {
super();
this.expectedErrCode = expectedErrCode.getNumber();
this.expectErrMsg = expectErrMsg;
this.latch = latch;
}
- public ExpectClosure(int code, String expectErrMsg, CountDownLatch latch) {
+ public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch) {
+ this(code, expectErrMsg, latch, null);
+ }
+
+ public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch,
+ final AtomicInteger successCount) {
super();
this.expectedErrCode = code;
this.expectErrMsg = expectErrMsg;
this.latch = latch;
+ this.successCount = successCount;
}
@Override
- public void run(Status status) {
+ public void run(final Status status) {
if (this.expectedErrCode >= 0) {
assertEquals(this.expectedErrCode, status.getCode());
}
if (this.expectErrMsg != null) {
assertEquals(this.expectErrMsg, status.getErrorMsg());
}
- latch.countDown();
+ if (status.isOk() && this.successCount != null) {
+ this.successCount.incrementAndGet();
+ }
+ this.latch.countDown();
}
}
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 84c11a97f8..c63b0ad96c 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -35,6 +35,7 @@ import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
@@ -165,7 +166,8 @@ public class MockStateMachine extends StateMachineAdapter {
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
- this.lastAppliedIndex.set(0);
+ SnapshotMeta meta = reader.load();
+ this.lastAppliedIndex.set(meta.lastIncludedIndex());
this.loadSnapshotTimes++;
final String path = reader.getPath() + File.separator + "data";
final File file = new File(path);