You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/01/21 18:20:13 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Added the
counter example.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new 8f1c7a5 IGNITE-13885 Added the counter example.
8f1c7a5 is described below
commit 8f1c7a5894abc26a120d4c1d31952172dc3c36ef
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu Jan 21 21:19:56 2021 +0300
IGNITE-13885 Added the counter example.
---
modules/raft/pom.xml | 4 +-
.../sofa/jraft/core/ReadOnlyServiceImpl.java | 2 +-
.../com/alipay/sofa/jraft/rpc/CliRequests.java | 86 ++---------
.../sofa/jraft/rpc/MessageBuilderFactory.java | 45 +++++-
.../com/alipay/sofa/jraft/rpc/RpcRequests.java | 8 +-
.../rpc/message/CreateGetLeaderRequestImpl.java | 36 +++++
.../rpc/message/CreateGetLeaderResponseImpl.java | 26 ++++
.../rpc/message/DefaultMessageBuilderFactory.java | 88 ++++++++++-
.../sofa/jraft/rpc/message/GetFileRequestImpl.java | 66 ++++++++
.../jraft/rpc/message/GetFileResponseImpl.java | 48 ++++++
.../rpc/message/InstallSnapshotRequestImpl.java | 101 ++++++++++++
...eImpl.java => InstallSnapshotResponseImpl.java} | 6 +-
.../jraft/rpc/message/TimeoutNowRequestImpl.java | 20 +++
.../jraft/rpc/message/TimeoutNowResponseImpl.java | 2 +-
.../java/com/alipay/sofa/jraft/core/NodeTest.java | 59 ++++++-
.../sofa/jraft/core/ReadOnlyServiceTest.java | 14 +-
.../com/alipay/sofa/jraft/core/TestCluster.java | 15 ++
.../alipay/sofa/jraft/counter/CounterClient.java | 94 +++++++++++
.../alipay/sofa/jraft/counter/CounterClosure.java | 60 ++++++++
.../sofa/jraft/counter/CounterExampleTest.java | 128 +++++++++++++++
.../sofa/jraft/counter/CounterOperation.java | 62 ++++++++
.../alipay/sofa/jraft/counter/CounterServer.java | 147 ++++++++++++++++++
.../alipay/sofa/jraft/counter/CounterService.java | 37 +++++
.../sofa/jraft/counter/CounterServiceImpl.java | 119 ++++++++++++++
.../sofa/jraft/counter/CounterStateMachine.java | 171 +++++++++++++++++++++
.../sofa/jraft/counter/rpc/GetValueRequest.java | 42 +++++
.../counter/rpc/GetValueRequestProcessor.java | 57 +++++++
.../jraft/counter/rpc/IncrementAndGetRequest.java | 42 +++++
.../rpc/IncrementAndGetRequestProcessor.java | 57 +++++++
.../sofa/jraft/counter/rpc/ValueResponse.java | 92 +++++++++++
.../counter/snapshot/CounterSnapshotFile.java | 68 ++++++++
.../java/com/alipay/sofa/jraft/test/TestUtils.java | 4 +-
modules/raft/src/test/resources/log4j2.xml | 4 +-
33 files changed, 1708 insertions(+), 102 deletions(-)
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 781090f..7d91946 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -37,8 +37,8 @@
<dependencies>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>3.4.6</version>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
index 8952fe9..f90445c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
@@ -262,7 +262,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);
- // start scanner
+ // start scanner TODO asch investigate, why it's needed ?
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
index 38732c0..c95579f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
@@ -48,7 +48,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createAddPeerResponse();
}
java.util.List<java.lang.String> getOldPeersList();
@@ -92,7 +92,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createRemovePeerRequest();
}
}
@@ -102,37 +102,19 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createRemovePeerResponse();
}
- /**
- * <code>repeated string old_peers = 1;</code>
- */
java.util.List<java.lang.String> getOldPeersList();
- /**
- * <code>repeated string old_peers = 1;</code>
- */
int getOldPeersCount();
- /**
- * <code>repeated string old_peers = 1;</code>
- */
java.lang.String getOldPeers(int index);
- /**
- * <code>repeated string new_peers = 2;</code>
- */
java.util.List<java.lang.String> getNewPeersList();
- /**
- * <code>repeated string new_peers = 2;</code>
- */
int getNewPeersCount();
- /**
- * <code>repeated string new_peers = 2;</code>
- */
java.lang.String getNewPeers(int index);
RpcRequests.ErrorResponse getErrorResponse();
@@ -147,29 +129,14 @@ public final class CliRequests {
}
public interface ChangePeersRequest extends Message {
- /**
- * <code>required string group_id = 1;</code>
- */
java.lang.String getGroupId();
- /**
- * <code>required string leader_id = 2;</code>
- */
java.lang.String getLeaderId();
- /**
- * <code>repeated string new_peers = 3;</code>
- */
java.util.List<java.lang.String> getNewPeersList();
- /**
- * <code>repeated string new_peers = 3;</code>
- */
int getNewPeersCount();
- /**
- * <code>repeated string new_peers = 3;</code>
- */
java.lang.String getNewPeers(int index);
public interface Builder {
@@ -183,7 +150,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createChangePeerRequest();
}
}
@@ -194,42 +161,21 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createChangePeerResponse();
}
- /**
- * <code>repeated string old_peers = 1;</code>
- */
java.util.List<java.lang.String> getOldPeersList();
- /**
- * <code>repeated string old_peers = 1;</code>
- */
int getOldPeersCount();
- /**
- * <code>repeated string old_peers = 1;</code>
- */
java.lang.String getOldPeers(int index);
- /**
- * <code>repeated string new_peers = 2;</code>
- */
java.util.List<java.lang.String> getNewPeersList();
- /**
- * <code>repeated string new_peers = 2;</code>
- */
int getNewPeersCount();
- /**
- * <code>repeated string new_peers = 2;</code>
- */
java.lang.String getNewPeers(int index);
- /**
- * <code>optional .jraft.ErrorResponse errorResponse = 99;</code>
- */
RpcRequests.ErrorResponse getErrorResponse();
public interface Builder {
@@ -255,7 +201,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createSnapshotRequest();
}
}
@@ -287,7 +233,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createResetPeerRequest();
}
}
@@ -311,7 +257,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createTransferLeaderRequest();
}
}
@@ -331,7 +277,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetLeaderRequest();
}
}
@@ -341,7 +287,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
}
java.lang.String getLeaderId();
@@ -375,7 +321,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetPeersRequest();
}
}
@@ -385,7 +331,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetPeersResponse();
}
java.util.List<java.lang.String> getPeersList();
@@ -433,7 +379,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createAddLearnersRequest();
}
}
@@ -459,7 +405,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createRemoveLearnersRequest();
}
}
@@ -491,7 +437,7 @@ public final class CliRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createResetLearnersRequest();
}
}
@@ -515,7 +461,7 @@ public final class CliRequests {
RpcRequests.ErrorResponse getErrorResponse();
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createLearnersOpResponse();
}
public interface Builder {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
index 4c119fc..cf992ec 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
@@ -9,8 +9,6 @@ import com.alipay.sofa.jraft.rpc.message.DefaultMessageBuilderFactory;
public interface MessageBuilderFactory {
public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
- CliRequests.AddPeerRequest.Builder createAddPeerRequest();
-
LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta();
RpcRequests.PingRequest.Builder createPingRequest();
@@ -42,4 +40,47 @@ public interface MessageBuilderFactory {
LocalStorageOutter.LocalSnapshotPbMeta.Builder createLocalSnapshotMeta();
LocalStorageOutter.LocalSnapshotPbMeta.File.Builder createFile();
+
+ RpcRequests.InstallSnapshotRequest.Builder createInstallSnapshotRequest();
+
+ RpcRequests.InstallSnapshotResponse.Builder createInstallSnapshotResponse();
+
+ RpcRequests.GetFileRequest.Builder createGetFileRequest();
+
+ RpcRequests.GetFileResponse.Builder createGetFileResponse();
+
+ // CLI
+ CliRequests.AddPeerRequest.Builder createAddPeerRequest();
+
+ CliRequests.AddPeerResponse.Builder createAddPeerResponse();
+
+ CliRequests.RemovePeerRequest.Builder createRemovePeerRequest();
+
+ CliRequests.RemovePeerResponse.Builder createRemovePeerResponse();
+
+ CliRequests.ChangePeersRequest.Builder createChangePeerRequest();
+
+ CliRequests.ChangePeersResponse.Builder createChangePeerResponse();
+
+ CliRequests.SnapshotRequest.Builder createSnapshotRequest();
+
+ CliRequests.ResetPeerRequest.Builder createResetPeerRequest();
+
+ CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest();
+
+ CliRequests.GetLeaderRequest.Builder createGetLeaderRequest();
+
+ CliRequests.GetLeaderResponse.Builder createGetLeaderResponse();
+
+ CliRequests.GetPeersRequest.Builder createGetPeersRequest();
+
+ CliRequests.GetPeersResponse.Builder createGetPeersResponse();
+
+ CliRequests.AddLearnersRequest.Builder createAddLearnersRequest();
+
+ CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest();
+
+ CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest();
+
+ CliRequests.LearnersOpResponse.Builder createLearnersOpResponse();
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
index eb6ac4f..c657307 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
@@ -73,7 +73,7 @@ public final class RpcRequests {
public interface InstallSnapshotRequest extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createInstallSnapshotRequest();
}
java.lang.String getGroupId();
@@ -111,7 +111,7 @@ public final class RpcRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createInstallSnapshotResponse();
}
/**
@@ -373,7 +373,7 @@ public final class RpcRequests {
public interface GetFileRequest extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetFileRequest();
}
long getReaderId();
@@ -415,7 +415,7 @@ public final class RpcRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createGetFileResponse();
}
boolean getEof();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderRequestImpl.java
new file mode 100644
index 0000000..5e3cc97
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderRequestImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.CliRequests;
+
+public class CreateGetLeaderRequestImpl implements CliRequests.GetLeaderRequest, CliRequests.GetLeaderRequest.Builder {
+ private String groupId;
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public boolean hasPeerId() {
+ return peerId != null;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public CliRequests.GetLeaderRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderResponseImpl.java
new file mode 100644
index 0000000..b4a024d
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/CreateGetLeaderResponseImpl.java
@@ -0,0 +1,26 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.CliRequests;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class CreateGetLeaderResponseImpl implements CliRequests.GetLeaderResponse, CliRequests.GetLeaderResponse.Builder {
+ private String leaderId;
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public CliRequests.GetLeaderResponse build() {
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
index e415c5e..d92ff6f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
@@ -8,10 +8,6 @@ import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
import com.alipay.sofa.jraft.rpc.RpcRequests;
public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
- @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
- return new AddPeerRequestImpl();
- }
-
@Override public LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta() {
return new LocalFileMetaImpl();
}
@@ -75,4 +71,88 @@ public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
@Override public LocalStorageOutter.LocalSnapshotPbMeta.File.Builder createFile() {
return new LocalSnapshotMetaFileImpl();
}
+
+ @Override public RpcRequests.InstallSnapshotRequest.Builder createInstallSnapshotRequest() {
+ return new InstallSnapshotRequestImpl();
+ }
+
+ @Override public RpcRequests.InstallSnapshotResponse.Builder createInstallSnapshotResponse() {
+ return new InstallSnapshotResponseImpl();
+ }
+
+ @Override public RpcRequests.GetFileRequest.Builder createGetFileRequest() {
+ return new GetFileRequestImpl();
+ }
+
+ @Override public RpcRequests.GetFileResponse.Builder createGetFileResponse() {
+ return new GetFileResponseImpl();
+ }
+
+ @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
+ return new AddPeerRequestImpl();
+ }
+
+ @Override public CliRequests.AddPeerResponse.Builder createAddPeerResponse() {
+ return null;
+ }
+
+ @Override public CliRequests.RemovePeerRequest.Builder createRemovePeerRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.RemovePeerResponse.Builder createRemovePeerResponse() {
+ return null;
+ }
+
+ @Override public CliRequests.ChangePeersRequest.Builder createChangePeerRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.ChangePeersResponse.Builder createChangePeerResponse() {
+ return null;
+ }
+
+ @Override public CliRequests.SnapshotRequest.Builder createSnapshotRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.ResetPeerRequest.Builder createResetPeerRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.GetLeaderRequest.Builder createGetLeaderRequest() {
+ return new CreateGetLeaderRequestImpl();
+ }
+
+ @Override public CliRequests.GetLeaderResponse.Builder createGetLeaderResponse() {
+ return new CreateGetLeaderResponseImpl();
+ }
+
+ @Override public CliRequests.GetPeersRequest.Builder createGetPeersRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.GetPeersResponse.Builder createGetPeersResponse() {
+ return null;
+ }
+
+ @Override public CliRequests.AddLearnersRequest.Builder createAddLearnersRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest() {
+ return null;
+ }
+
+ @Override public CliRequests.LearnersOpResponse.Builder createLearnersOpResponse() {
+ return null;
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileRequestImpl.java
new file mode 100644
index 0000000..d4d1a33
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileRequestImpl.java
@@ -0,0 +1,66 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class GetFileRequestImpl implements RpcRequests.GetFileRequest, RpcRequests.GetFileRequest.Builder {
+ private long readerId;
+ private String filename;
+ private long count;
+ private long offset;
+ private boolean readPartly;
+
+
+ @Override public long getReaderId() {
+ return readerId;
+ }
+
+ @Override public String getFilename() {
+ return filename;
+ }
+
+ @Override public long getCount() {
+ return count;
+ }
+
+ @Override public long getOffset() {
+ return offset;
+ }
+
+ @Override public boolean getReadPartly() {
+ return readPartly;
+ }
+
+ @Override public RpcRequests.GetFileRequest build() {
+ return this;
+ }
+
+ @Override public Builder setCount(long cnt) {
+ this.count = cnt;
+
+ return this;
+ }
+
+ @Override public Builder setOffset(long offset) {
+ this.offset = offset;
+
+ return this;
+ }
+
+ @Override public Builder setReadPartly(boolean readPartly) {
+ this.readPartly = readPartly;
+
+ return this;
+ }
+
+ @Override public Builder setFilename(String fileName) {
+ this.filename = fileName;
+
+ return this;
+ }
+
+ @Override public Builder setReaderId(long readerId) {
+ this.readerId = readerId;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileResponseImpl.java
new file mode 100644
index 0000000..f84e376
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/GetFileResponseImpl.java
@@ -0,0 +1,48 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+import com.alipay.sofa.jraft.util.ByteString;
+
+public class GetFileResponseImpl implements RpcRequests.GetFileResponse, RpcRequests.GetFileResponse.Builder {
+ private boolean eof;
+ private long readSize;
+ private ByteString data;
+
+ @Override public boolean getEof() {
+ return eof;
+ }
+
+ @Override public long getReadSize() {
+ return readSize;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public ByteString getData() {
+ return data;
+ }
+
+ @Override public RpcRequests.GetFileResponse build() {
+ return this;
+ }
+
+ @Override public Builder setReadSize(int read) {
+ this.readSize = read;
+
+ return this;
+ }
+
+ @Override public Builder setEof(boolean eof) {
+ this.eof = eof;
+
+ return this;
+ }
+
+ @Override public Builder setData(ByteString data) {
+ this.data = data;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotRequestImpl.java
new file mode 100644
index 0000000..8168d4b
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotRequestImpl.java
@@ -0,0 +1,101 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.RaftOutter;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class InstallSnapshotRequestImpl implements RpcRequests.InstallSnapshotRequest, RpcRequests.InstallSnapshotRequest.Builder {
+ private String groupId;
+ private String serverId;
+ private String peerId;
+ private long term;
+ private RaftOutter.SnapshotMeta meta;
+ private String uri;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getServerId() {
+ return serverId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public RaftOutter.SnapshotMeta getMeta() {
+ return meta;
+ }
+
+ @Override public String getUri() {
+ return uri;
+ }
+
+ @Override public RpcRequests.InstallSnapshotRequest build() {
+ return this;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setServerId(String serverId) {
+ this.serverId = serverId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public Builder setMeta(RaftOutter.SnapshotMeta meta) {
+ this.meta = meta;
+
+ return this;
+ }
+
+ @Override public Builder setUri(String uri) {
+ this.uri = uri;
+
+ return this;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ InstallSnapshotRequestImpl that = (InstallSnapshotRequestImpl) o;
+
+ if (term != that.term) return false;
+ if (!groupId.equals(that.groupId)) return false;
+ if (!serverId.equals(that.serverId)) return false;
+ if (!peerId.equals(that.peerId)) return false;
+ if (!meta.equals(that.meta)) return false;
+ return uri.equals(that.uri);
+ }
+
+ @Override public int hashCode() {
+ int result = groupId.hashCode();
+ result = 31 * result + serverId.hashCode();
+ result = 31 * result + peerId.hashCode();
+ result = 31 * result + (int) (term ^ (term >>> 32));
+ result = 31 * result + meta.hashCode();
+ result = 31 * result + uri.hashCode();
+ return result;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotResponseImpl.java
similarity index 72%
copy from modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
copy to modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotResponseImpl.java
index da441ff..9eb6bce 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/InstallSnapshotResponseImpl.java
@@ -2,7 +2,7 @@ package com.alipay.sofa.jraft.rpc.message;
import com.alipay.sofa.jraft.rpc.RpcRequests;
-class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcRequests.TimeoutNowResponse.Builder {
+public class InstallSnapshotResponseImpl implements RpcRequests.InstallSnapshotResponse, RpcRequests.InstallSnapshotResponse.Builder {
private long term;
private boolean success;
@@ -18,12 +18,12 @@ class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcReque
return null;
}
- @Override public RpcRequests.TimeoutNowResponse build() {
+ @Override public RpcRequests.InstallSnapshotResponse build() {
return this;
}
@Override public Builder setTerm(long currTerm) {
- this.term = term;
+ this.term = currTerm;
return this;
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
index af0be57..f1aaa9c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
@@ -51,4 +51,24 @@ class TimeoutNowRequestImpl implements RpcRequests.TimeoutNowRequest, RpcRequest
return this;
}
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimeoutNowRequestImpl that = (TimeoutNowRequestImpl) o;
+
+ if (term != that.term) return false;
+ if (!groupId.equals(that.groupId)) return false;
+ if (!serverId.equals(that.serverId)) return false;
+ return peerId.equals(that.peerId);
+ }
+
+ @Override public int hashCode() {
+ int result = groupId.hashCode();
+ result = 31 * result + serverId.hashCode();
+ result = 31 * result + peerId.hashCode();
+ result = 31 * result + (int) (term ^ (term >>> 32));
+ return result;
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
index da441ff..a628420 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
@@ -23,7 +23,7 @@ class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcReque
}
@Override public Builder setTerm(long currTerm) {
- this.term = term;
+ this.term = currTerm;
return this;
}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
index 2fa28a1..e87e1c4 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
@@ -1941,8 +1941,10 @@ public class NodeTest {
cluster.stopAll();
}
+ /**
+ * @throws Exception
+ */
@Test
- @Ignore
public void testRestoreSnasphot() throws Exception {
final List<PeerId> peers = TestUtils.generatePeers(3);
@@ -1955,6 +1957,9 @@ public class NodeTest {
cluster.waitLeader();
// get leader
final Node leader = cluster.getLeader();
+
+ LOG.info("Leader: " + leader);
+
assertNotNull(leader);
// apply tasks to leader
this.sendTestTaskAndWait(leader);
@@ -1965,15 +1970,65 @@ public class NodeTest {
// stop leader
final Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(leaderAddr));
- Thread.sleep(2000);
+ Thread.sleep(2000); // TODO asch while sleep is needed ?
+
+ // restart leader
+ cluster.waitLeader();
+ assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
+ assertTrue(cluster.start(leaderAddr));
+ cluster.ensureSame();
+ assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
+
+ cluster.stopAll();
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRestoreSnapshotWithDelta() throws Exception {
+ final List<PeerId> peers = TestUtils.generatePeers(3);
+
+ final TestCluster cluster = new TestCluster("unitest", this.dataPath, peers);
+
+ for (final PeerId peer : peers) {
+ assertTrue(cluster.start(peer.getEndpoint()));
+ }
+
+ cluster.waitLeader();
+ // get leader
+ final Node leader = cluster.getLeader();
+
+ LOG.info("Leader: " + leader);
+
+ assertNotNull(leader);
+ // apply tasks to leader
+ this.sendTestTaskAndWait(leader);
+
+ cluster.ensureSame();
+ triggerLeaderSnapshot(cluster, leader);
+
+ // stop leader
+ final Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
+ assertTrue(cluster.stop(leaderAddr));
+ Thread.sleep(2000); // TODO asch while sleep is needed ?
// restart leader
cluster.waitLeader();
+
+ sendTestTaskAndWait(cluster.getLeader(), 10, RaftError.SUCCESS);
+
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(leaderAddr));
+
+ Node oldLeader = cluster.getNode(leaderAddr);
+
cluster.ensureSame();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
+ MockStateMachine fsm = (MockStateMachine) oldLeader.getOptions().getFsm();
+ assertEquals(1, fsm.getLoadSnapshotTimes());
+
cluster.stopAll();
}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java
index 1de66ce..7fccdc2 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java
@@ -93,7 +93,7 @@ public class ReadOnlyServiceTest {
});
this.readOnlyServiceImpl.flush();
Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
- @Override public boolean matches(ReadIndexRequest argument) {
+ @Override public boolean matches(Object argument) {
if (argument != null) {
final ReadIndexRequest req = (ReadIndexRequest) argument;
return req.getGroupId().equals("test") && req.getServerId().equals("localhost:8081:0")
@@ -125,9 +125,7 @@ public class ReadOnlyServiceTest {
final ArgumentCaptor<RpcResponseClosure> closureCaptor = ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
-
- @Override
- public boolean matches(final ReadIndexRequest argument) {
+ @Override public boolean matches(Object argument) {
if (argument != null) {
final ReadIndexRequest req = (ReadIndexRequest) argument;
return req.getGroupId().equals("test") && req.getServerId().equals("localhost:8081:0")
@@ -172,9 +170,7 @@ public class ReadOnlyServiceTest {
final ArgumentCaptor<RpcResponseClosure> closureCaptor = ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
-
- @Override
- public boolean matches(final ReadIndexRequest argument) {
+ @Override public boolean matches(Object argument) {
if (argument != null) {
final ReadIndexRequest req = (ReadIndexRequest) argument;
return req.getGroupId().equals("test") && req.getServerId().equals("localhost:8081:0")
@@ -217,9 +213,7 @@ public class ReadOnlyServiceTest {
final ArgumentCaptor<RpcResponseClosure> closureCaptor = ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
-
- @Override
- public boolean matches(final ReadIndexRequest argument) {
+ @Override public boolean matches(Object argument) {
if (argument != null) {
final ReadIndexRequest req = (ReadIndexRequest) argument;
return req.getGroupId().equals("test") && req.getServerId().equals("localhost:8081:0")
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java
index ffbc985..8d20b0d 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java
@@ -258,6 +258,21 @@ public class TestCluster {
return false;
}
+ public Node getNode(Endpoint endpoint) {
+ this.lock.lock();
+ try {
+ for (NodeImpl node : nodes) {
+ if (node.getServerId().getEndpoint().equals(endpoint))
+ return node;
+ }
+ } finally {
+ this.lock.unlock();
+ }
+
+ return null;
+ }
+
+
public MockStateMachine getFsmByPeer(final PeerId peer) {
this.lock.lock();
try {
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClient.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClient.java
new file mode 100644
index 0000000..ab8d873
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClient.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.RouteTable;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.counter.rpc.IncrementAndGetRequest;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.option.CliOptions;
+import com.alipay.sofa.jraft.rpc.InvokeCallback;
+import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class CounterClient {
+
+ public static void main(final String[] args) throws Exception {
+ if (args.length != 2) {
+ System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}");
+ System.out
+ .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
+ System.exit(1);
+ }
+ final String groupId = args[0];
+ final String confStr = args[1];
+
+ final Configuration conf = new Configuration();
+ if (!conf.parse(confStr)) {
+ throw new IllegalArgumentException("Fail to parse conf:" + confStr);
+ }
+
+ RouteTable.getInstance().updateConfiguration(groupId, conf);
+
+ final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+ cliClientService.init(new CliOptions());
+
+ if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+ throw new IllegalStateException("Refresh leader failed");
+ }
+
+ final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
+ System.out.println("Leader is " + leader);
+ final int n = 1;
+ final CountDownLatch latch = new CountDownLatch(n);
+ final long start = System.currentTimeMillis();
+ for (int i = 0; i < n; i++) {
+ incrementAndGet(cliClientService, leader, i, latch);
+ }
+ latch.await();
+ System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
+ System.exit(0);
+ }
+
+ private static void incrementAndGet(final CliClientServiceImpl cliClientService, final PeerId leader,
+ final long delta, CountDownLatch latch) throws RemotingException,
+ InterruptedException {
+ final IncrementAndGetRequest request = new IncrementAndGetRequest();
+ request.setDelta(delta);
+ cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
+
+ @Override
+ public void complete(Object result, Throwable err) {
+ if (err == null) {
+ latch.countDown();
+ System.out.println("incrementAndGet result:" + result);
+ } else {
+ err.printStackTrace();
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public Executor executor() {
+ return null;
+ }
+ }, 5000);
+ }
+
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClosure.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClosure.java
new file mode 100644
index 0000000..77cb552
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterClosure.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.Closure;
+import com.alipay.sofa.jraft.counter.rpc.ValueResponse;
+
+/**
+ * @author likun (saimu.msm@antfin.com)
+ */
+public abstract class CounterClosure implements Closure {
+
+ private ValueResponse valueResponse;
+ private CounterOperation counterOperation;
+
+ public void setCounterOperation(CounterOperation counterOperation) {
+ this.counterOperation = counterOperation;
+ }
+
+ public CounterOperation getCounterOperation() {
+ return counterOperation;
+ }
+
+ public ValueResponse getValueResponse() {
+ return valueResponse;
+ }
+
+ public void setValueResponse(ValueResponse valueResponse) {
+ this.valueResponse = valueResponse;
+ }
+
+ protected void failure(final String errorMsg, final String redirect) {
+ final ValueResponse response = new ValueResponse();
+ response.setSuccess(false);
+ response.setErrorMsg(errorMsg);
+ response.setRedirect(redirect);
+ setValueResponse(response);
+ }
+
+ protected void success(final long value) {
+ final ValueResponse response = new ValueResponse();
+ response.setValue(value);
+ response.setSuccess(true);
+ setValueResponse(response);
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterExampleTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterExampleTest.java
new file mode 100644
index 0000000..d03c2f2
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterExampleTest.java
@@ -0,0 +1,128 @@
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.RouteTable;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.core.NodeImpl;
+import com.alipay.sofa.jraft.core.NodeTest;
+import com.alipay.sofa.jraft.core.TestCluster;
+import com.alipay.sofa.jraft.counter.rpc.IncrementAndGetRequest;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.option.CliOptions;
+import com.alipay.sofa.jraft.rpc.InvokeCallback;
+import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
+import com.alipay.sofa.jraft.test.TestUtils;
+import com.alipay.sofa.jraft.util.Utils;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CounterExampleTest {
+ static final Logger LOG = LoggerFactory.getLogger(NodeTest.class);
+ @Rule
+ public TestName testName = new TestName();
+ private String dataPath;
+
+ @Before
+ public void setup() throws Exception {
+ System.out.println(">>>>>>>>>>>>>>> Start test method: " + this.testName.getMethodName());
+ this.dataPath = TestUtils.mkTempDir();
+ new File(this.dataPath).mkdirs();
+ assertEquals(NodeImpl.GLOBAL_NUM_NODES.get(), 0);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (NodeImpl.GLOBAL_NUM_NODES.get() > 0) {
+ Thread.sleep(5000);
+ assertEquals(0, NodeImpl.GLOBAL_NUM_NODES.get());
+ }
+ assertTrue(Utils.delete(new File(this.dataPath)));
+ NodeManager.getInstance().clear();
+
+ System.out.println(">>>>>>>>>>>>>>> End test method: " + this.testName.getMethodName());
+ }
+
+ @Test
+ public void testCounter() throws IOException, InterruptedException, TimeoutException, RemotingException {
+
+ try {
+ String initConfStr = "127.0.0.1:8080,127.0.0.1:8081,127.0.0.1:8082";
+
+ String groupId = "counter";
+
+ // Create initial topology.
+ CounterServer node0 = CounterServer.start(dataPath, groupId, "127.0.0.1:8080", initConfStr);
+ CounterServer node1 = CounterServer.start(dataPath, groupId, "127.0.0.1:8081", initConfStr);
+ CounterServer node2 = CounterServer.start(dataPath, groupId, "127.0.0.1:8082", initConfStr);
+
+ Thread.sleep(3000);
+
+ // Create client.
+ final Configuration conf = new Configuration();
+
+ assertTrue(conf.parse(initConfStr));
+
+ RouteTable.getInstance().updateConfiguration(groupId, conf);
+
+ final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+ cliClientService.init(new CliOptions());
+
+ if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+ throw new IllegalStateException("Refresh leader failed");
+ }
+
+ final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
+ System.out.println("Leader is " + leader);
+ final int n = 1;
+ final CountDownLatch latch = new CountDownLatch(n);
+ final long start = System.currentTimeMillis();
+ for (int i = 0; i < n; i++) {
+ incrementAndGet(cliClientService, leader, i, latch);
+ }
+ latch.await();
+ System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
+ }
+ finally {
+ CounterServer.stopAll();
+ }
+ }
+
+ private static void incrementAndGet(final CliClientServiceImpl cliClientService, final PeerId leader,
+ final long delta, CountDownLatch latch) throws RemotingException, InterruptedException {
+ final IncrementAndGetRequest request = new IncrementAndGetRequest();
+ request.setDelta(delta);
+
+ cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
+
+ @Override
+ public void complete(Object result, Throwable err) {
+ if (err == null) {
+ latch.countDown();
+ System.out.println("incrementAndGet result:" + result);
+ } else {
+ err.printStackTrace();
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public Executor executor() {
+ return null;
+ }
+ }, 5000);
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterOperation.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterOperation.java
new file mode 100644
index 0000000..acf41c3
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import java.io.Serializable;
+
+/**
+ * The counter operation
+ *
+ * @author likun (saimu.msm@antfin.com)
+ */
+public class CounterOperation implements Serializable {
+
+ private static final long serialVersionUID = -6597003954824547294L;
+
+ /** Get value */
+ public static final byte GET = 0x01;
+ /** Increment and get value */
+ public static final byte INCREMENT = 0x02;
+
+ private byte op;
+ private long delta;
+
+ public static CounterOperation createGet() {
+ return new CounterOperation(GET);
+ }
+
+ public static CounterOperation createIncrement(final long delta) {
+ return new CounterOperation(INCREMENT, delta);
+ }
+
+ public CounterOperation(byte op) {
+ this(op, 0);
+ }
+
+ public CounterOperation(byte op, long delta) {
+ this.op = op;
+ this.delta = delta;
+ }
+
+ public byte getOp() {
+ return op;
+ }
+
+ public long getDelta() {
+ return delta;
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServer.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServer.java
new file mode 100644
index 0000000..e6fa618
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.counter.rpc.GetValueRequestProcessor;
+import com.alipay.sofa.jraft.counter.rpc.IncrementAndGetRequestProcessor;
+import com.alipay.sofa.jraft.counter.rpc.ValueResponse;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Counter server that keeps a counter value in a raft group.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 4:51:02 PM
+ */
+public class CounterServer {
+ private static Set<CounterServer> servers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private RaftGroupService raftGroupService;
+ private Node node;
+ private CounterStateMachine fsm;
+
+ public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
+ final NodeOptions nodeOptions) throws IOException {
+ // 初始化路径
+ new File(dataPath).mkdirs();
+
+ // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开
+ final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
+ // 注册业务处理器
+ CounterService counterService = new CounterServiceImpl(this);
+ rpcServer.registerProcessor(new GetValueRequestProcessor(counterService));
+ rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService));
+ // 初始化状态机
+ this.fsm = new CounterStateMachine();
+ // 设置状态机到启动参数
+ nodeOptions.setFsm(this.fsm);
+ // 设置存储路径
+ // 日志, 必须
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ // 元信息, 必须
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
+ // snapshot, 可选, 一般都推荐
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ // 初始化 raft group 服务框架
+ this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
+ // 启动
+ this.node = this.raftGroupService.start();
+ }
+
+ public static void stopAll() throws InterruptedException {
+ for (CounterServer server : servers) {
+ server.shutdown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ node.shutdown();
+ node.join();
+
+ servers.remove(this);
+ }
+
+ public CounterStateMachine getFsm() {
+ return this.fsm;
+ }
+
+ public Node getNode() {
+ return this.node;
+ }
+
+ public RaftGroupService RaftGroupService() {
+ return this.raftGroupService;
+ }
+
+ /**
+ * Redirect request to new leader
+ */
+ public ValueResponse redirect() {
+ final ValueResponse response = new ValueResponse();
+ response.setSuccess(false);
+ if (this.node != null) {
+ final PeerId leader = this.node.getLeaderId();
+ if (leader != null) {
+ response.setRedirect(leader.toString());
+ }
+ }
+ return response;
+ }
+
+ public static CounterServer start(String dataPath, String groupId, String serverIdStr, String initConfStr) throws IOException {
+ final NodeOptions nodeOptions = new NodeOptions();
+ // 为了测试,调整 snapshot 间隔等参数
+ // 设置选举超时时间为 1 秒
+ nodeOptions.setElectionTimeoutMs(1000);
+ // 关闭 CLI 服务。
+ nodeOptions.setDisableCli(false);
+ // 每隔30秒做一次 snapshot
+ nodeOptions.setSnapshotIntervalSecs(30);
+ // 解析参数
+ final PeerId serverId = new PeerId();
+ if (!serverId.parse(serverIdStr)) {
+ throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
+ }
+ final Configuration initConf = new Configuration();
+ if (!initConf.parse(initConfStr)) {
+ throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
+ }
+ // 设置初始集群配置
+ nodeOptions.setInitialConf(initConf);
+
+ // 启动
+ final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
+ System.out.println("Started counter server at port:"
+ + counterServer.getNode().getNodeId().getPeerId().getPort());
+
+ servers.add(counterServer);
+
+ return counterServer;
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterService.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterService.java
new file mode 100644
index 0000000..9302a4f
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+/**
+ * The counter service supporting query and count function.
+ *
+ * @author likun (saimu.msm@antfin.com)
+ */
+public interface CounterService {
+
+ /**
+ * Get current value from counter
+ *
+ * Provide consistent reading if {@code readOnlySafe} is true.
+ */
+ void get(final boolean readOnlySafe, final CounterClosure closure);
+
+ /**
+ * Add delta to counter then get value
+ */
+ void incrementAndGet(final long delta, final CounterClosure closure);
+}
\ No newline at end of file
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServiceImpl.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServiceImpl.java
new file mode 100644
index 0000000..143eb67
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterServiceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.entity.Task;
+import com.alipay.sofa.jraft.error.RaftError;
+import com.alipay.sofa.jraft.util.BytesUtil;
+import com.alipay.sofa.jraft.util.Marshaller;
+import com.alipay.sofa.jraft.util.StringUtils;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author likun (saimu.msm@antfin.com)
+ */
+public class CounterServiceImpl implements CounterService {
+ private static final Logger LOG = LoggerFactory.getLogger(CounterServiceImpl.class);
+
+ private final CounterServer counterServer;
+ private final Executor readIndexExecutor;
+
+ public CounterServiceImpl(CounterServer counterServer) {
+ this.counterServer = counterServer;
+ this.readIndexExecutor = createReadIndexExecutor();
+ }
+
+ private Executor createReadIndexExecutor() {
+ return Executors.newCachedThreadPool();
+ }
+
+ @Override
+ public void get(final boolean readOnlySafe, final CounterClosure closure) {
+ if(!readOnlySafe){
+ closure.success(getValue());
+ closure.run(Status.OK());
+ return;
+ }
+
+ this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ if(status.isOk()){
+ closure.success(getValue());
+ closure.run(Status.OK());
+ return;
+ }
+ CounterServiceImpl.this.readIndexExecutor.execute(() -> {
+ if(isLeader()){
+ LOG.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
+ applyOperation(CounterOperation.createGet(), closure);
+ }else {
+ handlerNotLeaderError(closure);
+ }
+ });
+ }
+ });
+ }
+
+ private boolean isLeader() {
+ return this.counterServer.getFsm().isLeader();
+ }
+
+ private long getValue() {
+ return this.counterServer.getFsm().getValue();
+ }
+
+ private String getRedirect() {
+ return this.counterServer.redirect().getRedirect();
+ }
+
+ @Override
+ public void incrementAndGet(final long delta, final CounterClosure closure) {
+ applyOperation(CounterOperation.createIncrement(delta), closure);
+ }
+
+ private void applyOperation(final CounterOperation op, final CounterClosure closure) {
+ if (!isLeader()) {
+ handlerNotLeaderError(closure);
+ return;
+ }
+
+ try {
+ closure.setCounterOperation(op);
+ final Task task = new Task();
+ task.setData(ByteBuffer.wrap(Marshaller.DEFAULT.marshall(op)));
+ task.setDone(closure);
+ this.counterServer.getNode().apply(task);
+ } catch (Exception e) {
+ String errorMsg = "Fail to encode CounterOperation";
+ LOG.error(errorMsg, e);
+ closure.failure(errorMsg, StringUtils.EMPTY);
+ closure.run(new Status(RaftError.EINTERNAL, errorMsg));
+ }
+ }
+
+ private void handlerNotLeaderError(final CounterClosure closure) {
+ closure.failure("Not leader.", getRedirect());
+ closure.run(new Status(RaftError.EPERM, "Not leader"));
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterStateMachine.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterStateMachine.java
new file mode 100644
index 0000000..be94696
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/CounterStateMachine.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter;
+
+import com.alipay.sofa.jraft.Closure;
+import com.alipay.sofa.jraft.Iterator;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.core.StateMachineAdapter;
+import com.alipay.sofa.jraft.counter.snapshot.CounterSnapshotFile;
+import com.alipay.sofa.jraft.error.RaftError;
+import com.alipay.sofa.jraft.error.RaftException;
+import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
+import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
+import com.alipay.sofa.jraft.util.Marshaller;
+import com.alipay.sofa.jraft.util.Utils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.alipay.sofa.jraft.counter.CounterOperation.GET;
+import static com.alipay.sofa.jraft.counter.CounterOperation.INCREMENT;
+
+/**
+ * Counter state machine.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 4:52:31 PM
+ */
+public class CounterStateMachine extends StateMachineAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class);
+
+ /**
+ * Counter value
+ */
+ private final AtomicLong value = new AtomicLong(0);
+ /**
+ * Leader term
+ */
+ private final AtomicLong leaderTerm = new AtomicLong(-1);
+
+ public boolean isLeader() {
+ return this.leaderTerm.get() > 0;
+ }
+
+ /**
+ * Returns current value.
+ */
+ public long getValue() {
+ return this.value.get();
+ }
+
+ @Override
+ public void onApply(final Iterator iter) {
+ while (iter.hasNext()) {
+ long current = 0;
+ CounterOperation counterOperation = null;
+
+ CounterClosure closure = null;
+ if (iter.done() != null) {
+ // This task is applied by this node, get value from closure to avoid additional parsing.
+ closure = (CounterClosure) iter.done();
+ counterOperation = closure.getCounterOperation();
+ } else {
+ // Have to parse FetchAddRequest from this user log.
+ final ByteBuffer data = iter.getData();
+
+ try {
+ counterOperation = Marshaller.DEFAULT.unmarshall(data.array());
+ } catch (final Exception e) {
+ LOG.error("Fail to decode IncrementAndGetRequest", e);
+ }
+ }
+ if (counterOperation != null) {
+ switch (counterOperation.getOp()) {
+ case GET:
+ current = this.value.get();
+ LOG.info("Get value={} at logIndex={}", current, iter.getIndex());
+ break;
+ case INCREMENT:
+ final long delta = counterOperation.getDelta();
+ final long prev = this.value.get();
+ current = this.value.addAndGet(delta);
+ LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
+ break;
+ }
+
+ if (closure != null) {
+ closure.success(current);
+ closure.run(Status.OK());
+ }
+ }
+ iter.next();
+ }
+ }
+
+ @Override
+ public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
+ final long currVal = this.value.get();
+ Utils.runInThread(() -> {
+ final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
+ if (snapshot.save(currVal)) {
+ if (writer.addFile("data")) {
+ done.run(Status.OK());
+ } else {
+ done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
+ }
+ } else {
+ done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
+ }
+ });
+ }
+
+ @Override
+ public void onError(final RaftException e) {
+ LOG.error("Raft error: {}", e, e);
+ }
+
+ @Override
+ public boolean onSnapshotLoad(final SnapshotReader reader) {
+ if (isLeader()) {
+ LOG.warn("Leader is not supposed to load snapshot");
+ return false;
+ }
+ if (reader.getFileMeta("data") == null) {
+ LOG.error("Fail to find data file in {}", reader.getPath());
+ return false;
+ }
+ final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
+ try {
+ this.value.set(snapshot.load());
+ return true;
+ } catch (final IOException e) {
+ LOG.error("Fail to load snapshot from {}", snapshot.getPath());
+ return false;
+ }
+
+ }
+
+ @Override
+ public void onLeaderStart(final long term) {
+ this.leaderTerm.set(term);
+ super.onLeaderStart(term);
+
+ }
+
+ @Override
+ public void onLeaderStop(final Status status) {
+ this.leaderTerm.set(-1);
+ super.onLeaderStop(status);
+ }
+
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequest.java
new file mode 100644
index 0000000..6b7218b
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.rpc;
+
+import com.alipay.sofa.jraft.rpc.Message;
+import java.io.Serializable;
+
+/**
+ * Get the latest value request.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 4:54:17 PM
+ */
+public class GetValueRequest implements Message {
+
+ private static final long serialVersionUID = 9218253805003988802L;
+
+ private boolean readOnlySafe = true;
+
+ public boolean isReadOnlySafe() {
+ return readOnlySafe;
+ }
+
+ public void setReadOnlySafe(boolean readOnlySafe) {
+ this.readOnlySafe = readOnlySafe;
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequestProcessor.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequestProcessor.java
new file mode 100644
index 0000000..df7b78e
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/GetValueRequestProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.rpc;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.counter.CounterClosure;
+import com.alipay.sofa.jraft.counter.CounterService;
+import com.alipay.sofa.jraft.rpc.RpcContext;
+import com.alipay.sofa.jraft.rpc.RpcProcessor;
+
+/**
+ * GetValueRequest processor.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 5:48:33 PM
+ */
+public class GetValueRequestProcessor implements RpcProcessor<GetValueRequest> {
+
+ private final CounterService counterService;
+
+ public GetValueRequestProcessor(CounterService counterService) {
+ super();
+ this.counterService = counterService;
+ }
+
+ @Override
+ public void handleRequest(final RpcContext rpcCtx, final GetValueRequest request) {
+ final CounterClosure closure = new CounterClosure() {
+ @Override
+ public void run(Status status) {
+ rpcCtx.sendResponse(getValueResponse());
+ }
+ };
+
+ this.counterService.get(request.isReadOnlySafe(), closure);
+ }
+
+ @Override
+ public String interest() {
+ return GetValueRequest.class.getName();
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequest.java
new file mode 100644
index 0000000..f98311f
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.rpc;
+
+import com.alipay.sofa.jraft.rpc.Message;
+import java.io.Serializable;
+
+/**
+ * Increment value with delta and get the new value request.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 4:53:22 PM
+ */
+public class IncrementAndGetRequest implements Message {
+
+ private static final long serialVersionUID = -5623664785560971849L;
+
+ private long delta;
+
+ public long getDelta() {
+ return this.delta;
+ }
+
+ public void setDelta(long delta) {
+ this.delta = delta;
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequestProcessor.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequestProcessor.java
new file mode 100644
index 0000000..5848547
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/IncrementAndGetRequestProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.rpc;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.counter.CounterClosure;
+import com.alipay.sofa.jraft.counter.CounterService;
+import com.alipay.sofa.jraft.rpc.RpcContext;
+import com.alipay.sofa.jraft.rpc.RpcProcessor;
+
+/**
+ * IncrementAndGetRequest processor.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 5:43:57 PM
+ */
+public class IncrementAndGetRequestProcessor implements RpcProcessor<IncrementAndGetRequest> {
+
+ private final CounterService counterService;
+
+ public IncrementAndGetRequestProcessor(CounterService counterService) {
+ super();
+ this.counterService = counterService;
+ }
+
+ @Override
+ public void handleRequest(final RpcContext rpcCtx, final IncrementAndGetRequest request) {
+ final CounterClosure closure = new CounterClosure() {
+ @Override
+ public void run(Status status) {
+ rpcCtx.sendResponse(getValueResponse());
+ }
+ };
+
+ this.counterService.incrementAndGet(request.getDelta(), closure);
+ }
+
+ @Override
+ public String interest() {
+ return IncrementAndGetRequest.class.getName();
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/ValueResponse.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/ValueResponse.java
new file mode 100644
index 0000000..fa7df3c
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/rpc/ValueResponse.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.rpc;
+
+import java.io.Serializable;
+
+/**
+ * Value response.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 4:55:35 PM
+ */
+public class ValueResponse implements Serializable {
+
+ private static final long serialVersionUID = -4220017686727146773L;
+
+ private long value;
+ private boolean success;
+
+ /**
+ * redirect peer id
+ */
+ private String redirect;
+
+ private String errorMsg;
+
+ public String getErrorMsg() {
+ return this.errorMsg;
+ }
+
+ public void setErrorMsg(String errorMsg) {
+ this.errorMsg = errorMsg;
+ }
+
+ public String getRedirect() {
+ return this.redirect;
+ }
+
+ public void setRedirect(String redirect) {
+ this.redirect = redirect;
+ }
+
+ public boolean isSuccess() {
+ return this.success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public long getValue() {
+ return this.value;
+ }
+
+ public void setValue(long value) {
+ this.value = value;
+ }
+
+ public ValueResponse(long value, boolean success, String redirect, String errorMsg) {
+ super();
+ this.value = value;
+ this.success = success;
+ this.redirect = redirect;
+ this.errorMsg = errorMsg;
+ }
+
+ public ValueResponse() {
+ super();
+ }
+
+ @Override
+ public String toString() {
+ return "ValueResponse [value=" + this.value + ", success=" + this.success + ", redirect=" + this.redirect
+ + ", errorMsg=" + this.errorMsg + "]";
+ }
+
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/snapshot/CounterSnapshotFile.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/snapshot/CounterSnapshotFile.java
new file mode 100644
index 0000000..addead8
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/counter/snapshot/CounterSnapshotFile.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.counter.snapshot;
+
+import com.alipay.sofa.jraft.util.StringUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Counter snapshot file.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-09 5:14:55 PM
+ */
+public class CounterSnapshotFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class);
+
+ private String path;
+
+ public CounterSnapshotFile(String path) {
+ super();
+ this.path = path;
+ }
+
+ public String getPath() {
+ return this.path;
+ }
+
+ /**
+ * Save value to snapshot file.
+ */
+ public boolean save(final long value) {
+ try {
+ Files.writeString(new File(path).toPath(), String.valueOf(value));
+ return true;
+ } catch (IOException e) {
+ LOG.error("Fail to save snapshot", e);
+ return false;
+ }
+ }
+
+ public long load() throws IOException {
+ final String s = Files.readString(new File(path).toPath());
+ if (!StringUtils.isBlank(s)) {
+ return Long.parseLong(s);
+ }
+ throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java
index 939b0de..4bd1926 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java
@@ -67,7 +67,9 @@ public class TestUtils {
}
public static String mkTempDir() {
- return Paths.get(System.getProperty("java.io.tmpdir", "/tmp"), "jraft_test_" + System.nanoTime()).toString();
+ String userDirectory = System.getProperty("user.dir");
+ // String tmpDir = System.getProperty("java.io.tmpdir", "/tmp");
+ return Paths.get(userDirectory, "jraft_test_" + System.nanoTime()).toString();
}
public static LogEntry mockEntry(final int index, final int term) {
diff --git a/modules/raft/src/test/resources/log4j2.xml b/modules/raft/src/test/resources/log4j2.xml
index 90d27d7..160cfab 100644
--- a/modules/raft/src/test/resources/log4j2.xml
+++ b/modules/raft/src/test/resources/log4j2.xml
@@ -3,7 +3,7 @@
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
+ <PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss,SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
</Console>
<!--<RollingFile name="RollingFile" filename="log/jraft-example.log"-->
@@ -19,7 +19,7 @@
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
- <AppenderRef ref="RollingFile"/>
+ <!--<AppenderRef ref="RollingFile"/>-->
</Root>
</Loggers>
</Configuration>
\ No newline at end of file