You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/22 02:42:51 UTC
[incubator-ratis] branch master updated: RATIS-1256. Leader
updateCommit should use the updated index. (#369)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 195c572 RATIS-1256. Leader updateCommit should use the updated index. (#369)
195c572 is described below
commit 195c572024fc5fd88d00c3c0985c01215d2719aa
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 22 10:41:24 2020 +0800
RATIS-1256. Leader updateCommit should use the updated index. (#369)
* RATIS-1256. Leader updateCommit should use the updated index.
* Fix a bug.
---
.../ratis/server/metrics/RaftLogMetrics.java | 4 +-
.../apache/ratis/server/protocol/TermIndex.java | 12 +--
.../ratis/server/raftlog/LogEntryHeader.java | 86 ++++++++++++++++++++++
.../org/apache/ratis/server/raftlog/RaftLog.java | 13 +---
.../apache/ratis/server/impl/LeaderStateImpl.java | 86 ++++++++++------------
.../ratis/server/metrics/RaftLogMetricsBase.java | 22 ++++--
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 20 ++---
.../ratis/server/raftlog/segmented/LogSegment.java | 23 ++----
.../server/raftlog/segmented/SegmentedRaftLog.java | 8 +-
.../raftlog/segmented/SegmentedRaftLogCache.java | 19 ++---
.../org/apache/ratis/OutputStreamBaseTest.java | 9 ++-
.../test/java/org/apache/ratis/RaftTestUtil.java | 8 +-
.../ratis/datastream/DataStreamTestUtils.java | 3 +-
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 6 +-
.../server/raftlog/memory/MemoryRaftLogTest.java | 5 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 5 +-
.../segmented/TestSegmentedRaftLogCache.java | 3 +-
17 files changed, 200 insertions(+), 132 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index 1e0ec99..42c1605 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -17,10 +17,10 @@
*/
package org.apache.ratis.server.metrics;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
/** Metrics for a raft log. */
public interface RaftLogMetrics {
/** A log entry just has been committed. */
- void onLogEntryCommitted(LogEntryProto proto);
+ void onLogEntryCommitted(LogEntryHeader header);
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index c4f90b9..7def686 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -41,6 +41,13 @@ public interface TermIndex extends Comparable<TermIndex> {
.build();
}
+ @Override
+ default int compareTo(TermIndex that) {
+ return Comparator.comparingLong(TermIndex::getTerm)
+ .thenComparingLong(TermIndex::getIndex)
+ .compare(this, that);
+ }
+
/** @return a {@link TermIndex} object from the given proto. */
static TermIndex valueOf(TermIndexProto proto) {
return Optional.ofNullable(proto).map(p -> valueOf(p.getTerm(), p.getIndex())).orElse(null);
@@ -65,11 +72,6 @@ public interface TermIndex extends Comparable<TermIndex> {
}
@Override
- public int compareTo(TermIndex that) {
- return Comparator.comparingLong(TermIndex::getTerm).thenComparingLong(TermIndex::getIndex).compare(this, that);
- }
-
- @Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java
new file mode 100644
index 0000000..5300ba5
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog;
+
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
+import org.apache.ratis.server.protocol.TermIndex;
+
+import java.util.Comparator;
+
+/** The header of a {@link LogEntryProto} including {@link TermIndex} and {@link LogEntryBodyCase}. */
+public interface LogEntryHeader extends Comparable<LogEntryHeader> {
+ LogEntryHeader[] EMPTY_ARRAY = {};
+
+ /** @return the {@link TermIndex}. */
+ TermIndex getTermIndex();
+
+ default long getTerm() {
+ return getTermIndex().getTerm();
+ }
+
+ default long getIndex() {
+ return getTermIndex().getIndex();
+ }
+
+ /** @return the {@link LogEntryBodyCase}. */
+ LogEntryBodyCase getLogEntryBodyCase();
+
+ static LogEntryHeader valueOf(LogEntryProto entry) {
+ return valueOf(TermIndex.valueOf(entry), entry.getLogEntryBodyCase());
+ }
+
+ static LogEntryHeader valueOf(TermIndex ti, LogEntryBodyCase logEntryBodyCase) {
+ return new LogEntryHeader() {
+ @Override
+ public TermIndex getTermIndex() {
+ return ti;
+ }
+
+ @Override
+ public LogEntryBodyCase getLogEntryBodyCase() {
+ return logEntryBodyCase;
+ }
+
+ @Override
+ public int compareTo(LogEntryHeader that) {
+ return Comparator.comparing(LogEntryHeader::getTermIndex)
+ .thenComparing(LogEntryHeader::getLogEntryBodyCase)
+ .compare(this, that);
+ }
+
+ @Override
+ public int hashCode() {
+ return ti.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (!(obj instanceof LogEntryHeader)) {
+ return false;
+ }
+
+ final LogEntryHeader that = (LogEntryHeader) obj;
+ return this.getLogEntryBodyCase() == that.getLogEntryBodyCase()
+ && this.getTermIndex().equals(that.getTermIndex());
+ }
+ };
+ }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index f84d010..e504462 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -50,13 +50,6 @@ public interface RaftLog extends RaftLogSequentialOps, Closeable {
}
/**
- * Is the entry corresponding to the given {@link TermIndex} a configuration entry?
- * In other words, the corresponding entry exists and
- * {@link LogEntryProto#hasConfigurationEntry()} returns true.
- */
- boolean isConfigEntry(TermIndex ti);
-
- /**
* @return null if the log entry is not found in this log;
* otherwise, return the {@link TermIndex} of the log entry corresponding to the given index.
*/
@@ -77,10 +70,10 @@ public interface RaftLog extends RaftLogSequentialOps, Closeable {
/**
* @param startIndex the starting log index (inclusive)
* @param endIndex the ending log index (exclusive)
- * @return an array of {@link TermIndex} of all log entries within the given index range. Null if
- * startIndex is greater than the smallest available index.
+ * @return null if entries are unavailable in this log;
+ * otherwise, return the log entry headers within the given index range.
*/
- TermIndex[] getEntries(long startIndex, long endIndex);
+ LogEntryHeader[] getEntries(long startIndex, long endIndex);
/** @return the index of the starting entry of this log. */
long getStartIndex();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 363ae3a..5db9158 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -39,9 +40,9 @@ import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
@@ -719,31 +720,34 @@ class LeaderStateImpl implements LeaderState {
}
}
+ private void updateCommit(LogEntryHeader[] entriesToCommit) {
+ final long newCommitIndex = raftLog.getLastCommittedIndex();
+ logMetadata(newCommitIndex);
+ commitIndexChanged();
+
+ boolean hasConfiguration = false;
+ for (LogEntryHeader entry : entriesToCommit) {
+ if (entry.getIndex() > newCommitIndex) {
+ break;
+ }
+ hasConfiguration |= entry.getLogEntryBodyCase() == LogEntryBodyCase.CONFIGURATIONENTRY;
+ raftLog.getRaftLogMetrics().onLogEntryCommitted(entry);
+ }
+ if (hasConfiguration) {
+ checkAndUpdateConfiguration();
+ }
+ }
+
private void updateCommit(long majority, long min) {
final long oldLastCommitted = raftLog.getLastCommittedIndex();
if (majority > oldLastCommitted) {
- // copy the entries out from the raftlog, in order to prevent that
- // the log gets purged after the statemachine does a snapshot
- final TermIndex[] entriesToCommit = raftLog.getEntries(
- oldLastCommitted + 1, majority + 1);
+ // Get the headers before updating commit index since the log can be purged after a snapshot
+ final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1);
if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
- watchRequests.update(ReplicationLevel.MAJORITY, majority);
- logMetadata(majority);
- commitIndexChanged();
- }
-
- try {
- for (TermIndex entry : entriesToCommit) {
- raftLog.getRaftLogMetrics().onLogEntryCommitted(raftLog.get(entry.getIndex()));
- }
- } catch (RaftLogIOException e) {
- LOG.error("Caught exception reading from RaftLog", e);
+ updateCommit(entriesToCommit);
}
-
- checkAndUpdateConfiguration(entriesToCommit);
}
-
watchRequests.update(ReplicationLevel.ALL, min);
}
@@ -752,36 +756,24 @@ class LeaderStateImpl implements LeaderState {
notifySenders();
}
- private boolean committedConf(TermIndex[] entries) {
- final long currentCommitted = raftLog.getLastCommittedIndex();
- for (TermIndex entry : entries) {
- if (entry.getIndex() <= currentCommitted && raftLog.isConfigEntry(entry)) {
- return true;
- }
- }
- return false;
- }
-
- private void checkAndUpdateConfiguration(TermIndex[] entriesToCheck) {
+ private void checkAndUpdateConfiguration() {
final RaftConfigurationImpl conf = server.getRaftConf();
- if (committedConf(entriesToCheck)) {
- if (conf.isTransitional()) {
- replicateNewConf();
- } else { // the (new) log entry has been committed
- pendingRequests.replySetConfiguration(server::newSuccessReply);
- // if the leader is not included in the current configuration, step down
- if (!conf.containsInConf(server.getId())) {
- LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
- try {
- // leave some time for all RPC senders to send out new conf entry
- server.properties().minRpcTimeout().sleep();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- // the pending request handler will send NotLeaderException for
- // pending client requests when it stops
- server.close();
+ if (conf.isTransitional()) {
+ replicateNewConf();
+ } else { // the (new) log entry has been committed
+ pendingRequests.replySetConfiguration(server::newSuccessReply);
+ // if the leader is not included in the current configuration, step down
+ if (!conf.containsInConf(server.getId())) {
+ LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
+ try {
+ // leave some time for all RPC senders to send out new conf entry
+ server.properties().minRpcTimeout().sleep();
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
}
+ // the pending request handler will send NotLeaderException for
+ // pending client requests when it stops
+ server.close();
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
index f5ef1d6..aef6398 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
@@ -22,8 +22,8 @@ import com.codahale.metrics.Timer;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics {
public static final String RATIS_LOG_WORKER_METRICS_DESC = "Metrics for Log Worker";
@@ -56,13 +56,19 @@ public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics {
return registry.timer(timerName);
}
- public void onLogEntryCommitted(LogEntryProto proto) {
- if (proto.hasConfigurationEntry()) {
- registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
- } else if (proto.hasMetadataEntry()) {
- registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
- } else if (proto.hasStateMachineLogEntry()) {
- registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+ @Override
+ public void onLogEntryCommitted(LogEntryHeader header) {
+ switch (header.getLogEntryBodyCase()) {
+ case CONFIGURATIONENTRY:
+ registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
+ return;
+ case METADATAENTRY:
+ registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
+ return;
+ case STATEMACHINELOGENTRY:
+ registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+ return;
+ default:
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 7fa91bc..6f80cdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.RaftLogBase;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
@@ -49,6 +50,10 @@ public class MemoryRaftLog extends RaftLogBase {
return TermIndex.valueOf(get(i));
}
+ private LogEntryHeader getLogEntryHeader(int i) {
+ return LogEntryHeader.valueOf(get(i));
+ }
+
int size() {
return entries.size();
}
@@ -108,7 +113,7 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
- public TermIndex[] getEntries(long startIndex, long endIndex) {
+ public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
if (startIndex >= entries.size()) {
@@ -116,11 +121,11 @@ public class MemoryRaftLog extends RaftLogBase {
}
final int from = Math.toIntExact(startIndex);
final int to = Math.toIntExact(Math.min(entries.size(), endIndex));
- TermIndex[] ti = new TermIndex[to - from];
- for (int i = 0; i < ti.length; i++) {
- ti[i] = entries.getTermIndex(i);
+ final LogEntryHeader[] headers = new LogEntryHeader[to - from];
+ for (int i = 0; i < headers.length; i++) {
+ headers[i] = entries.getLogEntryHeader(i);
}
- return ti;
+ return headers;
}
}
@@ -231,9 +236,4 @@ public class MemoryRaftLog extends RaftLogBase {
return CompletableFuture.completedFuture(lastSnapshotIndex);
// do nothing
}
-
- @Override
- public boolean isConfigEntry(TermIndex ti) {
- return get(ti.getIndex()).hasConfigurationEntry();
- }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 665d407..672d304 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -21,6 +21,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
@@ -35,11 +36,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,15 +87,19 @@ public class LogSegment implements Comparable<Long> {
static class LogRecord {
/** starting offset in the file */
private final long offset;
- private final TermIndex termIndex;
+ private final LogEntryHeader logEntryHeader;
LogRecord(long offset, LogEntryProto entry) {
this.offset = offset;
- this.termIndex = TermIndex.valueOf(entry);
+ this.logEntryHeader = LogEntryHeader.valueOf(entry);
+ }
+
+ LogEntryHeader getLogEntryHeader() {
+ return logEntryHeader;
}
TermIndex getTermIndex() {
- return termIndex;
+ return getLogEntryHeader().getTermIndex();
}
long getOffset() {
@@ -273,7 +276,6 @@ public class LogSegment implements Comparable<Long> {
* the entryCache caches the content of log entries.
*/
private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
- private final Set<TermIndex> configEntries = new HashSet<>();
private LogSegment(RaftStorage storage, boolean isOpen, long start, long end,
SegmentedRaftLogMetrics raftLogMetrics) {
@@ -328,9 +330,6 @@ public class LogSegment implements Comparable<Long> {
if (keepEntryInCache) {
putEntryCache(record.getTermIndex(), entry, op);
}
- if (entry.hasConfigurationEntry()) {
- configEntries.add(record.getTermIndex());
- }
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
}
@@ -370,10 +369,6 @@ public class LogSegment implements Comparable<Long> {
return last == null ? null : last.getTermIndex();
}
- boolean isConfigEntry(TermIndex ti) {
- return configEntries.contains(ti);
- }
-
long getTotalFileSize() {
return totalFileSize;
}
@@ -390,7 +385,6 @@ public class LogSegment implements Comparable<Long> {
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
- configEntries.remove(removed.getTermIndex());
totalFileSize = removed.offset;
}
isOpen = false;
@@ -417,7 +411,6 @@ public class LogSegment implements Comparable<Long> {
synchronized void clear() {
records.clear();
evictCache();
- configEntries.clear();
endIndex = startIndex - 1;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1142bcf..925d4de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -326,7 +327,7 @@ public class SegmentedRaftLog extends RaftLogBase {
}
@Override
- public TermIndex[] getEntries(long startIndex, long endIndex) {
+ public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
@@ -499,11 +500,6 @@ public class SegmentedRaftLog extends RaftLogBase {
}
@Override
- public boolean isConfigEntry(TermIndex ti) {
- return cache.isConfigEntry(ti);
- }
-
- @Override
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
super.close();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 2325556..60a55a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
@@ -221,8 +222,8 @@ public class SegmentedRaftLogCache {
}
}
- TermIndex[] getTermIndex(long startIndex, long realEnd, LogSegment openSegment) {
- final TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
+ LogEntryHeader[] getTermIndex(long startIndex, long realEnd, LogSegment openSegment) {
+ final LogEntryHeader[] entries = new LogEntryHeader[Math.toIntExact(realEnd - startIndex)];
final int searchIndex;
long index = startIndex;
@@ -487,7 +488,7 @@ public class SegmentedRaftLogCache {
* @param startIndex inclusive
* @param endIndex exclusive
*/
- TermIndex[] getTermIndices(final long startIndex, final long endIndex) {
+ LogEntryHeader[] getTermIndices(final long startIndex, final long endIndex) {
if (startIndex < 0 || startIndex < getStartIndex()) {
throw new IndexOutOfBoundsException("startIndex = " + startIndex
+ ", log cache starts from index " + getStartIndex());
@@ -498,27 +499,21 @@ public class SegmentedRaftLogCache {
}
final long realEnd = Math.min(getEndIndex() + 1, endIndex);
if (startIndex >= realEnd) {
- return TermIndex.EMPTY_ARRAY;
+ return LogEntryHeader.EMPTY_ARRAY;
}
return closedSegments.getTermIndex(startIndex, realEnd, openSegment);
}
private static void getFromSegment(LogSegment segment, long startIndex,
- TermIndex[] entries, int offset, int size) {
+ LogEntryHeader[] entries, int offset, int size) {
long endIndex = segment.getEndIndex();
endIndex = Math.min(endIndex, startIndex + size - 1);
int index = offset;
for (long i = startIndex; i <= endIndex; i++) {
- LogRecord r = segment.getLogRecord(i);
- entries[index++] = r == null ? null : r.getTermIndex();
+ entries[index++] = Optional.ofNullable(segment.getLogRecord(i)).map(LogRecord::getLogEntryHeader).orElse(null);
}
}
- boolean isConfigEntry(TermIndex ti) {
- LogSegment segment = getSegment(ti.getIndex());
- return segment != null && segment.isConfigEntry(ti);
- }
-
long getStartIndex() {
if (closedSegments.isEmpty()) {
return Optional.ofNullable(openSegment).map(LogSegment::getStartIndex).orElse(RaftLog.INVALID_LOG_INDEX);
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 823dfb2..aa69fa4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
@@ -95,9 +96,9 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
long committedIndex = raftLog.getLastCommittedIndex();
Assert.assertTrue(committedIndex >= expectedCommittedIndex);
// check the log content
- TermIndex[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
+ final LogEntryHeader[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
int count = 0;
- for (TermIndex entry : entries) {
+ for (LogEntryHeader entry : entries) {
LogEntryProto log = raftLog.get(entry.getIndex());
if (!log.hasStateMachineLogEntry()) {
continue;
@@ -244,10 +245,10 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
final int expectedEntries = 8;
final RaftLog raftLog = assertRaftLog(expectedEntries, leader);
- final TermIndex[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
+ final LogEntryHeader[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
final byte[] actual = new byte[ByteValue.BUFFERSIZE * expectedEntries];
totalSize = 0;
- for (TermIndex ti : entries) {
+ for (LogEntryHeader ti : entries) {
final LogEntryProto e = raftLog.get(ti.getIndex());
if (e.hasStateMachineLogEntry()) {
final byte[] eValue = e.getStateMachineLogEntry().getLogData().toByteArray();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 6cfe89f..6987d0a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,7 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogBase;
@@ -156,7 +156,7 @@ public interface RaftTestUtil {
static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
int idxEntries = 0;
int idxExpected = 0;
- TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+ final LogEntryHeader[] termIndices = log.getEntries(startIndex, endIndex);
while (idxEntries < termIndices.length
&& idxExpected < expectedMessages.length) {
try {
@@ -177,7 +177,7 @@ public interface RaftTestUtil {
static boolean logEntriesNotContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
int idxEntries = 0;
int idxExpected = 0;
- TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+ final LogEntryHeader[] termIndices = log.getEntries(startIndex, endIndex);
while (idxEntries < termIndices.length
&& idxExpected < expectedMessages.length) {
try {
@@ -197,7 +197,7 @@ public interface RaftTestUtil {
static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages,
Predicate<LogEntryProto> predicate) {
- TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+ final LogEntryHeader[] termIndices = log.getEntries(0, Long.MAX_VALUE);
for (int i = 0; i < termIndices.length; i++) {
for (int j = 0; j < expectedMessages.length; j++) {
final LogEntryProto e;
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index f7cdc36..89eed98 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -38,6 +38,7 @@ import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
@@ -359,7 +360,7 @@ public interface DataStreamTestUtils {
}
static LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog log) throws Exception {
- for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+ for (LogEntryHeader termIndex : log.getEntries(0, Long.MAX_VALUE)) {
final LogEntryProto entry = log.get(termIndex.getIndex());
if (entry.hasStateMachineLogEntry()) {
if (invocationId.match(entry.getStateMachineLogEntry())) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 4e92efd..d5dd0c3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -24,7 +24,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -96,11 +96,11 @@ public class TestRaftWithGrpc
cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).forEach(raftServer ->
JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
final long leaderNextIndex = leaderLog.getNextIndex();
- final TermIndex[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);
+ final LogEntryHeader[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);
final RaftLog followerLog = raftServer.getRaftLog();
Assert.assertEquals(leaderNextIndex, followerLog.getNextIndex());
- final TermIndex[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE);
+ final LogEntryHeader[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE);
Assert.assertArrayEquals(serverEntries, leaderEntries);
}, 10, HUNDRED_MILLIS, "assertRaftLog-" + raftServer.getId(), LOG)));
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
index 6c893b3..c2cfb75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -60,7 +61,7 @@ public class MemoryRaftLogTest extends BaseTest {
entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(0).build();
raftLog.append(entries2).forEach(CompletableFuture::join);
- TermIndex[] termIndices = raftLog.getEntries(0, 10);
+ final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
assertEquals(2, termIndices.length);
for (int i = 0; i < 2; i++) {
assertEquals(entries1[i].getIndex(), termIndices[i].getIndex());
@@ -88,7 +89,7 @@ public class MemoryRaftLogTest extends BaseTest {
entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(2).build();
raftLog.append(entries2).forEach(CompletableFuture::join);
- TermIndex[] termIndices = raftLog.getEntries(0, 10);
+ final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
assertEquals(1, termIndices.length);
assertEquals(entries2[0].getIndex(), termIndices[0].getIndex());
assertEquals(entries2[0].getTerm(), termIndices[0].getTerm());
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 2bc208b..e1fff14 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
@@ -198,7 +199,7 @@ public class TestSegmentedRaftLog extends BaseTest {
Assert.assertEquals(e, entry);
}
- TermIndex[] termIndices = raftLog.getEntries(0, 500);
+ final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500);
LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
.map(ti -> {
try {
@@ -406,7 +407,7 @@ public class TestSegmentedRaftLog extends BaseTest {
LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
Assert.assertEquals(expected.get(i), entry);
}
- TermIndex[] termIndices = raftLog.getEntries(
+ final LogEntryHeader[] termIndices = raftLog.getEntries(
expected.get(offset).getIndex(),
expected.get(offset + size - 1).getIndex() + 1);
LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 675b942..976e9d6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -29,6 +29,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
@@ -93,7 +94,7 @@ public class TestSegmentedRaftLogCache {
}
private void checkCacheEntries(long offset, int size, long end) {
- TermIndex[] entries = cache.getTermIndices(offset, offset + size);
+ final LogEntryHeader[] entries = cache.getTermIndices(offset, offset + size);
long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
Assert.assertEquals(realEnd - offset, entries.length);
for (long i = offset; i < realEnd; i++) {