You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/28 23:14:25 UTC
incubator-ratis git commit: RATIS-19. Include clientId and callId in
log entries.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 8ac50a721 -> 392fa5141
RATIS-19. Include clientId and callId in log entries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/392fa514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/392fa514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/392fa514
Branch: refs/heads/master
Commit: 392fa51415047b6e265a73d4ef175f1a468535b7
Parents: 8ac50a7
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Feb 28 15:13:42 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Feb 28 15:13:42 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/ratis/util/ProtoUtils.java | 5 ++++-
ratis-proto-shaded/src/main/proto/Raft.proto | 5 +++++
.../ratis/server/impl/RaftServerImpl.java | 3 ++-
.../apache/ratis/server/impl/ServerState.java | 6 ++++--
.../apache/ratis/server/storage/RaftLog.java | 8 ++++---
.../ratis/server/storage/TestRaftLogCache.java | 10 ++++++---
.../server/storage/TestRaftLogReadWrite.java | 20 ++++++++++++------
.../server/storage/TestRaftLogSegment.java | 22 ++++++++++++--------
.../server/storage/TestSegmentedRaftLog.java | 7 +++++--
9 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 8d9c25e..2613342 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -113,9 +114,11 @@ public class ProtoUtils {
}
public static LogEntryProto toLogEntryProto(
- SMLogEntryProto operation, long term, long index) {
+ SMLogEntryProto operation, long term, long index,
+ ClientId clientId, long callId) {
return LogEntryProto.newBuilder().setTerm(term).setIndex(index)
.setSmLogEntry(operation)
+ .setClientId(toByteString(clientId.toBytes())).setCallId(callId)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 182a905..14901f6 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -50,6 +50,11 @@ message LogEntryProto {
RaftConfigurationProto configurationEntry = 4;
LeaderNoOp noOp = 5;
}
+
+ // clientId and callId are used to rebuild the retry cache. They're not
+ // necessary for configuration change since re-conf is idempotent.
+ bytes clientId = 6;
+ uint64 callId = 7;
}
message TermIndexProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 470141c..d729914 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -380,7 +380,8 @@ public class RaftServerImpl implements RaftServer {
// append the message to its local log
final long entryIndex;
try {
- entryIndex = state.applyLog(entry);
+ entryIndex = state.applyLog(entry, request.getClientId(),
+ request.getSeqNum());
} catch (IOException e) {
throw new RaftException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index dd2d784..4b7efbd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -24,6 +24,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.MemoryRaftLog;
@@ -204,8 +205,9 @@ public class ServerState implements Closeable {
return log;
}
- long applyLog(TransactionContext operation) throws IOException {
- return log.append(currentTerm, operation);
+ long applyLog(TransactionContext operation, ClientId clientId, long callId)
+ throws IOException {
+ return log.append(currentTerm, operation, clientId, callId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index f0c7b60..32422d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.ConfigurationManager;
import org.apache.ratis.server.impl.RaftConfiguration;
@@ -125,7 +126,8 @@ public abstract class RaftLog implements Closeable {
* Used by the leader.
* @return the index of the new log entry.
*/
- public long append(long term, TransactionContext operation) throws IOException {
+ public long append(long term, TransactionContext operation,
+ ClientId clientId, long callId) throws IOException {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();
@@ -136,7 +138,7 @@ public abstract class RaftLog implements Closeable {
// build the log entry after calling the StateMachine
final LogEntryProto e = ProtoUtils.toLogEntryProto(
- operation.getSMLogEntry().get(), term, nextIndex);
+ operation.getSMLogEntry().get(), term, nextIndex, clientId, callId);
appendEntry(e);
operation.setLogEntry(e);
@@ -207,7 +209,7 @@ public abstract class RaftLog implements Closeable {
* If an existing entry conflicts with a new one (same index but different
* terms), delete the existing entry and all entries that follow it (�5.3).
*
- * This method, {@link #append(long, TransactionContext)},
+ * This method, {@link #append(long, TransactionContext, ClientId, long)},
* {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
* do not guarantee the changes are persisted.
* Need to call {@link #logSync()} to persist the changes.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index f5a18ac..388c269 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
import java.util.Iterator;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.ProtoUtils;
@@ -28,6 +29,9 @@ import org.junit.Before;
import org.junit.Test;
public class TestRaftLogCache {
+ private static final ClientId clientId = ClientId.createId();
+ private static final long callId = 0;
+
private RaftLogCache cache;
@Before
@@ -40,7 +44,7 @@ public class TestRaftLogCache {
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, i);
+ 0, i, clientId, callId);
s.appendToOpenSegment(entry);
}
if (!isOpen) {
@@ -130,7 +134,7 @@ public class TestRaftLogCache {
final SimpleOperation m = new SimpleOperation("m");
try {
LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, 0);
+ 0, 0, clientId, callId);
cache.appendEntry(entry);
Assert.fail("the open segment is null");
} catch (IllegalStateException ignored) {
@@ -140,7 +144,7 @@ public class TestRaftLogCache {
cache.addSegment(openSegment);
for (long index = 101; index < 200; index++) {
LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, index);
+ 0, index, clientId, callId);
cache.appendEntry(entry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
index bcdb958..f72d007 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -34,6 +34,7 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ChecksumException;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
@@ -53,9 +54,11 @@ import org.slf4j.LoggerFactory;
public class TestRaftLogReadWrite {
private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class);
+ private static final ClientId clientId = ClientId.createId();
+ private static final long callId = 0;
+
private File storageDir;
private RaftProperties properties;
- private int segmentMaxSize;
@Before
public void setup() throws Exception {
@@ -90,7 +93,8 @@ public class TestRaftLogReadWrite {
long size = 0;
for (int i = 0; i < entries.length; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+ entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+ clientId, callId);
final int s = entries[i].getSerializedSize();
size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4;
out.write(entries[i]);
@@ -131,7 +135,8 @@ public class TestRaftLogReadWrite {
new LogOutputStream(openSegment, false, properties)) {
for (int i = 0; i < 100; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+ entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+ clientId, callId);
out.write(entries[i]);
}
}
@@ -140,7 +145,8 @@ public class TestRaftLogReadWrite {
new LogOutputStream(openSegment, true, properties)) {
for (int i = 100; i < 200; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+ entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+ clientId, callId);
out.write(entries[i]);
}
}
@@ -196,7 +202,8 @@ public class TestRaftLogReadWrite {
LogOutputStream out = new LogOutputStream(openSegment, false, properties);
for (int i = 0; i < 10; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+ entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+ clientId, callId);
out.write(entries[i]);
}
out.flush();
@@ -243,7 +250,8 @@ public class TestRaftLogReadWrite {
new LogOutputStream(openSegment, false, properties)) {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ProtoUtils.toLogEntryProto(
- new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
+ new SimpleOperation("m" + i).getLogEntryContent(), 0, i,
+ clientId, callId);
out.write(entry);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 3092a21..fa72f64 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -32,6 +32,7 @@ import java.util.List;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -47,6 +48,9 @@ import org.junit.Test;
* Test basic functionality of {@link LogSegment}
*/
public class TestRaftLogSegment {
+ private static final ClientId clientId = ClientId.createId();
+ private static final long callId = 0;
+
private File storageDir;
private final RaftProperties properties = new RaftProperties();
@@ -75,7 +79,7 @@ public class TestRaftLogSegment {
for (int i = 0; i < size; i++) {
SimpleOperation op = new SimpleOperation("m" + i);
entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- term, i + start);
+ term, i + start, clientId, callId);
out.write(entries[i]);
}
}
@@ -132,7 +136,7 @@ public class TestRaftLogSegment {
while (size < max) {
SimpleOperation op = new SimpleOperation("m" + i);
LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- term, i++ + start);
+ term, i++ + start, clientId, callId);
size += getEntrySize(entry);
list.add(entry);
}
@@ -148,18 +152,18 @@ public class TestRaftLogSegment {
SimpleOperation op = new SimpleOperation("m");
final SMLogEntryProto m = op.getLogEntryContent();
try {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001);
+ LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId);
segment.appendToOpenSegment(entry);
Assert.fail("should fail since the entry's index needs to be 1000");
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalArgumentException);
}
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000);
+ LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId);
segment.appendToOpenSegment(entry);
try {
- entry = ProtoUtils.toLogEntryProto(m, 0, 1002);
+ entry = ProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId);
segment.appendToOpenSegment(entry);
Assert.fail("should fail since the entry's index needs to be 1001");
} catch (Exception e) {
@@ -168,7 +172,7 @@ public class TestRaftLogSegment {
LogEntryProto[] entries = new LogEntryProto[2];
for (int i = 0; i < 2; i++) {
- entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
+ entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId);
}
try {
segment.appendToOpenSegment(entries);
@@ -185,7 +189,7 @@ public class TestRaftLogSegment {
LogSegment segment = LogSegment.newOpenSegment(start);
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ProtoUtils.toLogEntryProto(
- new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
+ new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId);
segment.appendToOpenSegment(entry);
}
@@ -251,7 +255,7 @@ public class TestRaftLogSegment {
getProperties(1024, 1024))) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- 0, 0);
+ 0, 0, clientId, callId);
size = LogSegment.getEntrySize(entry);
out.write(entry);
}
@@ -282,7 +286,7 @@ public class TestRaftLogSegment {
Arrays.fill(content, (byte) 1);
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- 0, 0);
+ 0, 0, clientId, callId);
final long entrySize = LogSegment.getEntrySize(entry);
long totalSize = SegmentedRaftLog.HEADER_BYTES.length;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 08f671f..9b88321 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -33,6 +33,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.ConfigurationManager;
@@ -53,6 +54,8 @@ public class TestSegmentedRaftLog {
}
private static final RaftPeerId peerId = new RaftPeerId("s0");
+ private static final ClientId clientId = ClientId.createId();
+ private static final long callId = 0;
private static class SegmentRange {
final long start;
@@ -103,7 +106,7 @@ public class TestSegmentedRaftLog {
for (int i = 0; i < size; i++) {
SimpleOperation m = new SimpleOperation("m" + (i + range.start));
entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- range.term, i + range.start);
+ range.term, i + range.start, clientId, callId);
out.write(entries[i]);
}
}
@@ -153,7 +156,7 @@ public class TestSegmentedRaftLog {
new SimpleOperation("m" + index) :
new SimpleOperation(stringSupplier.get());
eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- range.term, index));
+ range.term, index, clientId, callId));
}
}
return eList;