You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/12/31 08:40:42 UTC
[ignite-3] 03/03: IGNITE-13885 partially working.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a529ad1eaebdbd0d999b3102d20f10d931afc8af
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu Dec 31 11:40:20 2020 +0300
IGNITE-13885 partially working.
---
.../sofa/jraft/entity/LocalStorageOutter.java | 3 +-
.../com/alipay/sofa/jraft/entity/RaftOutter.java | 3 +-
.../com/alipay/sofa/jraft/rpc/CliRequests.java | 2 +-
.../java/com/alipay/sofa/jraft/rpc/Connection.java | 1 -
.../alipay/sofa/jraft/rpc/HasErrorResponse.java | 2 +-
.../sofa/jraft/rpc/MessageBuilderFactory.java | 28 +++-
.../com/alipay/sofa/jraft/rpc/RaftRpcFactory.java | 9 --
.../com/alipay/sofa/jraft/rpc/RpcRequests.java | 26 ++--
.../sofa/jraft/rpc/impl/AbstractClientService.java | 11 +-
.../sofa/jraft/rpc/impl/LocalConnection.java | 35 +++++
.../sofa/jraft/rpc/impl/LocalRaftRpcFactory.java | 35 ++++-
.../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java | 72 ++++++++-
.../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 165 ++++++++++++++++++++-
.../rpc/message/AppendEntriesRequestImpl.java | 130 ++++++++++++++++
.../rpc/message/AppendEntriesResponseImpl.java | 47 ++++++
.../rpc/message/DefaultMessageBuilderFactory.java | 53 ++++++-
.../sofa/jraft/rpc/message/EntryMetaImpl.java | 133 +++++++++++++++++
.../sofa/jraft/rpc/message/ErrorResponseImpl.java | 32 ++++
.../sofa/jraft/rpc/message/PingRequestImpl.java | 21 +++
.../sofa/jraft/rpc/message/PreVoteRequestImpl.java | 87 +++++++++++
.../jraft/rpc/message/ReadIndexRequestImpl.java | 76 ++++++++++
.../jraft/rpc/message/ReadIndexResponseImpl.java | 36 +++++
.../jraft/rpc/message/RequestVoteResponseImpl.java | 36 +++++
.../alipay/sofa/jraft/rpc/message/StableMeta.java | 32 ++++
.../jraft/rpc/message/TimeoutNowRequestImpl.java | 54 +++++++
.../jraft/rpc/message/TimeoutNowResponseImpl.java | 36 +++++
.../com/alipay/sofa/jraft/util/JDKMarshaller.java | 40 +++--
.../com/alipay/sofa/jraft/util/Marshaller.java | 4 +-
.../java/com/alipay/sofa/jraft/util/Utils.java | 2 +-
.../com.alipay.sofa.jraft.rpc.RaftRpcFactory | 2 +-
.../com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 154 +++++++++++++++++++
.../jraft/storage/impl/LocalLogStorageTest.java | 5 -
.../sofa/jraft/storage/io/LocalFileReaderTest.java | 1 -
modules/raft/src/test/resources/log4j2.xml | 25 ++++
34 files changed, 1317 insertions(+), 81 deletions(-)
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
index 81e54b4..d483818 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
@@ -20,6 +20,7 @@
package com.alipay.sofa.jraft.entity;
import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
import com.alipay.sofa.jraft.storage.RaftMetaStorage;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import java.nio.ByteBuffer;
@@ -45,7 +46,7 @@ public final class LocalStorageOutter {
public interface StablePBMeta extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createStableMeta();
}
long getTerm();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
index 83d40ea..b8b78a3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
@@ -19,12 +19,13 @@
package com.alipay.sofa.jraft.entity;
+import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
import com.alipay.sofa.jraft.rpc.RpcRequests;
public final class RaftOutter {
public interface EntryMeta {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createEntryMeta();
}
long getTerm();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
index 4c8776d..38732c0 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
@@ -38,7 +38,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return MessageBuilderFactory.DEFAULT.createAddPeer();
+ return MessageBuilderFactory.DEFAULT.createAddPeerRequest();
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
index 9bae02a..256bf6f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
@@ -22,7 +22,6 @@ package com.alipay.sofa.jraft.rpc;
* @author jiachun.fjc
*/
public interface Connection {
-
/**
* Get the attribute that bound to the connection.
*
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
index 917b05f..59c8d5c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
@@ -1,5 +1,5 @@
package com.alipay.sofa.jraft.rpc;
public interface HasErrorResponse extends Message {
- RpcRequests.ErrorResponse getErrorResponse();
+ RpcRequests.ErrorResponse getErrorResponse(); // TODO asch can be removed.
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
index 49aed96..a286c6c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
@@ -1,13 +1,39 @@
package com.alipay.sofa.jraft.rpc;
import com.alipay.sofa.jraft.entity.LocalFileMetaOutter;
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
import com.alipay.sofa.jraft.rpc.message.DefaultMessageBuilderFactory;
// TODO asch use JRaftServiceLoader ?
public interface MessageBuilderFactory {
public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
- CliRequests.AddPeerRequest.Builder createAddPeer();
+ CliRequests.AddPeerRequest.Builder createAddPeerRequest();
LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta();
+
+ RpcRequests.PingRequest.Builder createPingRequest();
+
+ RpcRequests.RequestVoteRequest.Builder createVoteRequest();
+
+ RpcRequests.RequestVoteResponse.Builder createVoteResponse();
+
+ RpcRequests.ErrorResponse.Builder createErrorResponse();
+
+ LocalStorageOutter.StablePBMeta.Builder createStableMeta();
+
+ RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest();
+
+ RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse();
+
+ RaftOutter.EntryMeta.Builder createEntryMeta();
+
+ RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest();
+
+ RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse();
+
+ RpcRequests.ReadIndexRequest.Builder createReadIndexRequest();
+
+ RpcRequests.ReadIndexResponse.Builder createReadIndexResponse();
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
index acc19c5..2989f25 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
@@ -29,15 +29,6 @@ public interface RaftRpcFactory {
RpcResponseFactory DEFAULT = new RpcResponseFactory() {};
/**
- * Register serializer with class name.
- *
- * @param className class name
- * @param args extended parameters, different implementers may need different parameters,
- * the order of parameters need a convention
- */
- void registerProtobufSerializer(final String className, final Object... args);
-
- /**
* Creates a raft RPC client.
*
* @return a new rpc client instance
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
index 74568b7..eb6ac4f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
@@ -19,14 +19,8 @@
package com.alipay.sofa.jraft.rpc;
-import com.alipay.sofa.jraft.RaftGroupService;
-import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.RaftOutter;
-import com.alipay.sofa.jraft.option.BootstrapOptions;
-import com.alipay.sofa.jraft.option.ReplicatorOptions;
import com.alipay.sofa.jraft.util.ByteString;
-import com.alipay.sofa.jraft.util.DisruptorBuilder;
-import java.io.ByteArrayOutputStream;
public final class RpcRequests {
private RpcRequests() {
@@ -45,7 +39,7 @@ public final class RpcRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createPingRequest();
}
}
@@ -73,7 +67,7 @@ public final class RpcRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createErrorResponse();
}
}
@@ -140,7 +134,7 @@ public final class RpcRequests {
public interface TimeoutNowRequest extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createTimeoutNowRequest();
}
java.lang.String getGroupId();
@@ -166,7 +160,7 @@ public final class RpcRequests {
public interface TimeoutNowResponse extends HasErrorResponse {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createTimeoutNowResponse();
}
static Message getDefaultInstance() {
@@ -231,7 +225,7 @@ public final class RpcRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createVoteRequest();
}
}
@@ -241,7 +235,7 @@ public final class RpcRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createVoteResponse();
}
/**
@@ -292,7 +286,7 @@ public final class RpcRequests {
public interface AppendEntriesRequest extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createAppendEntriesRequest();
}
/**
@@ -355,7 +349,7 @@ public final class RpcRequests {
}
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createAppendEntriesResponse();
}
long getTerm();
@@ -445,7 +439,7 @@ public final class RpcRequests {
public interface ReadIndexRequest extends Message {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createReadIndexRequest();
}
java.lang.String getGroupId();
@@ -477,7 +471,7 @@ public final class RpcRequests {
public interface ReadIndexResponse extends HasErrorResponse {
static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.createReadIndexResponse();
}
static Message getDefaultInstance() {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
index 80e7d1e..5701370 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
@@ -219,10 +219,15 @@ public abstract class AbstractClientService implements ClientService {
if (result instanceof ErrorResponse) {
status = handleErrorResponse((ErrorResponse) result);
msg = (Message) result;
- } else if (result instanceof HasErrorResponse) {
+ } else if (result instanceof HasErrorResponse) { // TODO asch we don't need this.
final ErrorResponse eResp = ((HasErrorResponse) result).getErrorResponse();
- status = handleErrorResponse(eResp);
- msg = (Message) eResp;
+ if (eResp != null) {
+ status = handleErrorResponse(eResp);
+ msg = eResp;
+ }
+ else {
+ msg = (T) result;
+ }
} else {
msg = (T) result;
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
new file mode 100644
index 0000000..0607196
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -0,0 +1,35 @@
+package com.alipay.sofa.jraft.rpc.impl;
+
+import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LocalConnection implements Connection {
+ private Map<String, Object> attrs = new ConcurrentHashMap<>();
+
+ final LocalRpcClient client;
+ final Endpoint srv;
+
+ public LocalConnection(LocalRpcClient client, Endpoint srv) {
+ this.client = client;
+ this.srv = srv;
+ }
+
+ @Override public Object getAttribute(String key) {
+ return attrs.get(key);
+ }
+
+ @Override public void setAttribute(String key, Object value) {
+ attrs.put(key, value);
+ }
+
+ @Override public Object setAttributeIfAbsent(String key, Object value) {
+ return attrs.putIfAbsent(key, value);
+ }
+
+ @Override public void close() {
+ LocalRpcServer.closeConnection(client, srv);
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
index acaa136..ac53089 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
@@ -16,6 +16,7 @@
*/
package com.alipay.sofa.jraft.rpc.impl;
+import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcServer;
@@ -31,16 +32,40 @@ import org.slf4j.LoggerFactory;
@SPI
public class LocalRaftRpcFactory implements RaftRpcFactory {
private static final Logger LOG = LoggerFactory.getLogger(LocalRaftRpcFactory.class);
+ @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) {
+ LocalRpcClient rpcClient = new LocalRpcClient();
- @Override public void registerProtobufSerializer(String className, Object... args) {
+ if (helper != null)
+ helper.config(rpcClient);
+ return rpcClient;
}
- @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) {
- return null;
+ @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) {
+ LocalRpcServer srv = new LocalRpcServer(endpoint);
+
+ if (helper != null)
+ helper.config(srv);
+
+ return srv;
}
- @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) {
- return null;
+ @Override public ConfigHelper<RpcServer> defaultJRaftServerConfigHelper(RpcOptions opts) {
+ return new ConfigHelper<RpcServer>() {
+ @Override public void config(RpcServer instance) {
+ LocalRpcServer srv = (LocalRpcServer) instance;
+ // TODO asch.
+ }
+ };
+ }
+
+ @Override
+ public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) {
+ return new ConfigHelper<RpcClient>() {
+ @Override public void config(RpcClient instance) {
+ LocalRpcClient rpcClient = (LocalRpcClient) instance;
+ // TODO asch.
+ }
+ };
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
index a6c063b..cbff3d6 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
@@ -17,41 +17,95 @@
package com.alipay.sofa.jraft.rpc.impl;
import com.alipay.sofa.jraft.ReplicatorGroup;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.InvokeTimeoutException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.RpcOptions;
+import com.alipay.sofa.jraft.rpc.Connection;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
/**
- * Bolt rpc client impl.
+ * Local rpc client impl.
*
- * @author jiachun.fjc
+ * @author ascherbakov.
*/
public class LocalRpcClient implements RpcClient {
+ private volatile ReplicatorGroup replicatorGroup = null;
+
@Override public boolean checkConnection(Endpoint endpoint) {
- return false;
+ return LocalRpcServer.connect(this, endpoint, false, null);
}
@Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
- return false;
+ return LocalRpcServer.connect(this, endpoint, createIfAbsent, this::onCreated);
}
@Override public void closeConnection(Endpoint endpoint) {
-
+ LocalRpcServer.closeConnection(this, endpoint);
}
@Override public void registerConnectEventListener(ReplicatorGroup replicatorGroup) {
+ this.replicatorGroup = replicatorGroup;
+ }
+ private void onCreated(LocalConnection conn) {
+ if (replicatorGroup != null) {
+ final PeerId peer = new PeerId();
+ if (peer.parse(conn.srv.toString())) {
+ replicatorGroup.checkReplicator(peer, true);
+ }
+ else
+ System.out.println("Fail to parse peer: {}" + peer); // TODO asch
+ }
}
@Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
- return null;
+ if (!checkConnection(endpoint, true))
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+ if (srv == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ CompletableFuture fut = new CompletableFuture();
+
+ Object[] tuple = {this, request, fut};
+ assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+
+ try {
+ return fut.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new RemotingException(e);
+ } catch (TimeoutException e) {
+ throw new InvokeTimeoutException(e);
+ }
}
@Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException {
+ if (!checkConnection(endpoint, true))
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+ if (srv == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ CompletableFuture fut = new CompletableFuture();
+
+ Object[] tuple = {this, request, fut};
+ assert srv.incoming.offer(tuple);
+ fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> {
+ callback.complete(res, err);
+ }).orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
}
@Override public boolean init(RpcOptions opts) {
@@ -59,6 +113,10 @@ public class LocalRpcClient implements RpcClient {
}
@Override public void shutdown() {
-
+ // Close all connection from this peer.
+ for (LocalRpcServer value : LocalRpcServer.servers.values())
+ LocalRpcServer.closeConnection(this, value.local);
}
+
+
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index 4423a42..71ae19b 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -16,32 +16,181 @@
*/
package com.alipay.sofa.jraft.rpc.impl;
+import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
/**
- * Bolt RPC server impl.
+ * Local RPC server impl.
*
- * @author jiachun.fjc
+ * @author ascherbakov.
*/
public class LocalRpcServer implements RpcServer {
- @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
+ /** Running servers. */
+ public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
+
+ Endpoint local;
+
+ /** Remote connections to this server. */
+ public ConcurrentMap<LocalRpcClient, LocalConnection> conns = new ConcurrentHashMap<>();
+
+ private Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
+
+ private volatile boolean started = false;
+
+ private Thread worker;
+ private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
+
+ BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch use some kind of MPSC queue.
+
+ public LocalRpcServer(Endpoint local) {
+ this.local = local;
}
- @Override public void registerProcessor(RpcProcessor<?> processor) {
+ static synchronized boolean connect(LocalRpcClient client, Endpoint srv, boolean createIfAbsent, Consumer<LocalConnection> onCreated) {
+ LocalRpcServer locSrv = servers.get(srv);
+
+ if (locSrv == null)
+ return false; // Server is dead.
+
+ LocalConnection conn = locSrv.conns.get(client);
+
+ if (conn == null) {
+ if (!createIfAbsent)
+ return false;
+
+ conn = new LocalConnection(client, srv);
+
+ locSrv.conns.put(client, conn);
+
+ onCreated.accept(conn);
+ }
+
+ return true;
+ }
+
+ static synchronized void closeConnection(LocalRpcClient client, Endpoint srv) {
+ LocalRpcServer locSrv = servers.get(srv);
+
+ if (locSrv == null)
+ return;
+
+ LocalConnection conn = locSrv.conns.remove(client);
+
+ if (conn == null)
+ return;
+
+ locSrv.listeners.forEach(l -> l.onClosed(client.toString(), conn));
+ }
+
+ @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
+ if (!listeners.contains(listener))
+ listeners.add(listener);
+ }
+ @Override public void registerProcessor(RpcProcessor<?> processor) {
+ processors.put(processor.interest(), processor);
}
@Override public int boundPort() {
- return 0;
+ return local.getPort();
}
- @Override public boolean init(Void opts) {
- return false;
+ @Override public synchronized boolean init(Void opts) {
+ if (started)
+ return false;
+
+ worker = new Thread(new Runnable() {
+ @Override public void run() {
+ while(started) {
+ try {
+ Object[] tuple = incoming.take();
+ LocalRpcClient sender = (LocalRpcClient) tuple[0];
+
+ // Connection is not established, ignore message.
+ LocalConnection conn = conns.get(sender);
+ if (conn == null)
+ continue;
+
+ Message msg = (Message) tuple[1];
+ CompletableFuture<Object> fut = (CompletableFuture) tuple[2];
+
+ Class<? extends Message> cls = msg.getClass();
+ RpcProcessor prc = processors.get(cls.getName());
+
+ // TODO asch cache it.
+ if (prc == null) {
+ for (Class<?> iface : cls.getInterfaces()) {
+ prc = processors.get(iface.getName());
+
+ if (prc != null)
+ break;
+ }
+ }
+
+ if (prc == null)
+ System.out.println();
+
+ prc.handleRequest(new RpcContext() {
+ @Override public void sendResponse(Object responseObj) {
+ fut.complete(responseObj);
+ }
+
+ @Override public Connection getConnection() {
+ return conn;
+ }
+
+ @Override public String getRemoteAddress() {
+ return sender.toString();
+ }
+ }, msg);
+
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ });
+
+ worker.setName("LocalRPCServer-Thread: " + local.toString());
+ worker.start();
+
+ servers.put(local, this);
+
+ started = true;
+
+ return true;
}
- @Override public void shutdown() {
+ @Override public synchronized void shutdown() {
+ if (!started)
+ return;
+
+ started = false;
+ worker.interrupt();
+ try {
+ worker.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
+ }
+
+ // Close all connections to this server.
+ for (LocalRpcClient client : conns.keySet())
+ closeConnection(client, local);
+ servers.remove(local);
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java
new file mode 100644
index 0000000..416cc30
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java
@@ -0,0 +1,130 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.RaftOutter;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+import com.alipay.sofa.jraft.util.ByteString;
+import com.alipay.sofa.jraft.util.Marshaller;
+import java.util.ArrayList;
+import java.util.List;
+
+class AppendEntriesRequestImpl implements RpcRequests.AppendEntriesRequest, RpcRequests.AppendEntriesRequest.Builder {
+ private String groupId;
+ private String serverId;
+ private String peerId;
+ private long term;
+ private long prevLogTerm;
+ private long prevLogIndex;
+ private List<RaftOutter.EntryMeta> entiesList = new ArrayList<>();
+ private long committedIndex;
+ private ByteString data = ByteString.EMPTY;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getServerId() {
+ return serverId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getPrevLogTerm() {
+ return prevLogTerm;
+ }
+
+ @Override public long getPrevLogIndex() {
+ return prevLogIndex;
+ }
+
+ @Override public List<RaftOutter.EntryMeta> getEntriesList() {
+ return entiesList;
+ }
+
+ @Override public RaftOutter.EntryMeta getEntries(int index) {
+ return entiesList.get(index);
+ }
+
+ @Override public int getEntriesCount() {
+ return entiesList.size();
+ }
+
+ @Override public long getCommittedIndex() {
+ return committedIndex;
+ }
+
+ @Override public ByteString getData() {
+ return data;
+ }
+
+ @Override public boolean hasData() {
+ return data != ByteString.EMPTY;
+ }
+
+ @Override public byte[] toByteArray() {
+ return Marshaller.DEFAULT.marshall(this);
+ }
+
+ @Override public RpcRequests.AppendEntriesRequest build() {
+ return this;
+ }
+
+ @Override public Builder setData(ByteString data) {
+ this.data = data;
+
+ return this;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setServerId(String serverId) {
+ this.serverId = serverId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public Builder setPrevLogIndex(long prevLogIndex) {
+ this.prevLogIndex = prevLogIndex;
+
+ return this;
+ }
+
+ @Override public Builder setPrevLogTerm(long prevLogTerm) {
+ this.prevLogTerm = prevLogTerm;
+
+ return this;
+ }
+
+ @Override public Builder setCommittedIndex(long lastCommittedIndex) {
+ this.committedIndex = lastCommittedIndex;
+
+ return this;
+ }
+
+ @Override public Builder addEntries(RaftOutter.EntryMeta entryMeta) {
+ entiesList.add(entryMeta);
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java
new file mode 100644
index 0000000..9187ab6
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java
@@ -0,0 +1,47 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class AppendEntriesResponseImpl implements RpcRequests.AppendEntriesResponse, RpcRequests.AppendEntriesResponse.Builder {
+ private long term;
+ private boolean success;
+ private long lastLogIndex;
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public boolean getSuccess() {
+ return success;
+ }
+
+ @Override public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public RpcRequests.AppendEntriesResponse build() {
+ return this;
+ }
+
+ @Override public Builder setSuccess(boolean success) {
+ this.success = success;
+
+ return this;
+ }
+
+ @Override public Builder setTerm(long currTerm) {
+ this.term = currTerm;
+
+ return this;
+ }
+
+ @Override public Builder setLastLogIndex(long lastLogIndex) {
+ this.lastLogIndex = lastLogIndex;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
index 14a3fb4..9b8b54d 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
@@ -1,15 +1,66 @@
package com.alipay.sofa.jraft.rpc.message;
import com.alipay.sofa.jraft.entity.LocalFileMetaOutter;
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
import com.alipay.sofa.jraft.rpc.CliRequests;
import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
- @Override public CliRequests.AddPeerRequest.Builder createAddPeer() {
+ @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
return new AddPeerRequestImpl();
}
@Override public LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta() {
return new LocalFileMetaImpl();
}
+
+ @Override public RpcRequests.PingRequest.Builder createPingRequest() {
+ return new PingRequestImpl();
+ }
+
+ @Override public RpcRequests.RequestVoteRequest.Builder createVoteRequest() {
+ return new PreVoteRequestImpl();
+ }
+
+ @Override public RpcRequests.RequestVoteResponse.Builder createVoteResponse() {
+ return new RequestVoteResponseImpl();
+ }
+
+ @Override public RpcRequests.ErrorResponse.Builder createErrorResponse() {
+ return new ErrorResponseImpl();
+ }
+
+ @Override public LocalStorageOutter.StablePBMeta.Builder createStableMeta() {
+ return new StableMeta();
+ }
+
+ @Override public RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest() {
+ return new AppendEntriesRequestImpl();
+ }
+
+ @Override public RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse() {
+ return new AppendEntriesResponseImpl();
+ }
+
+ @Override public RaftOutter.EntryMeta.Builder createEntryMeta() {
+ return new EntryMetaImpl();
+ }
+
+ @Override public RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest() {
+ return new TimeoutNowRequestImpl();
+ }
+
+ @Override public RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse() {
+ return new TimeoutNowResponseImpl();
+ }
+
+ @Override public RpcRequests.ReadIndexRequest.Builder createReadIndexRequest() {
+ return new ReadIndexRequestImpl();
+ }
+
+ @Override public RpcRequests.ReadIndexResponse.Builder createReadIndexResponse() {
+ return new ReadIndexResponseImpl();
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java
new file mode 100644
index 0000000..5356d76
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java
@@ -0,0 +1,133 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.EnumOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
+import java.util.ArrayList;
+import java.util.List;
+
+class EntryMetaImpl implements RaftOutter.EntryMeta, RaftOutter.EntryMeta.Builder {
+ private long term;
+ private EnumOutter.EntryType type;
+ private List<String> peersList = new ArrayList<>();
+ private long dataLen;
+ private List<String> oldPeersList = new ArrayList<>();
+ private long checksum;
+ private List<String> learnersList = new ArrayList<>();
+ private List<String> oldLearnersList = new ArrayList<>();
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public EnumOutter.EntryType getType() {
+ return type;
+ }
+
+ @Override public List<String> getPeersList() {
+ return peersList;
+ }
+
+ @Override public int getPeersCount() {
+ return peersList.size();
+ }
+
+ @Override public String getPeers(int index) {
+ return peersList.get(index);
+ }
+
+ @Override public long getDataLen() {
+ return dataLen;
+ }
+
+ @Override public List<String> getOldPeersList() {
+ return oldPeersList;
+ }
+
+ @Override public int getOldPeersCount() {
+ return oldPeersList.size();
+ }
+
+ @Override public String getOldPeers(int index) {
+ return oldPeersList.get(index);
+ }
+
+ @Override public long getChecksum() {
+ return checksum;
+ }
+
+ @Override public List<String> getLearnersList() {
+ return learnersList;
+ }
+
+ @Override public int getLearnersCount() {
+ return learnersList.size();
+ }
+
+ @Override public String getLearners(int index) {
+ return learnersList.get(index);
+ }
+
+ @Override public List<String> getOldLearnersList() {
+ return oldLearnersList;
+ }
+
+ @Override public int getOldLearnersCount() {
+ return oldLearnersList.size();
+ }
+
+ @Override public String getOldLearners(int index) {
+ return oldLearnersList.get(index);
+ }
+
+ @Override public RaftOutter.EntryMeta build() {
+ return this;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setChecksum(long checksum) {
+ this.checksum = checksum;
+
+ return this;
+ }
+
+ @Override public Builder setType(EnumOutter.EntryType type) {
+ this.type = type;
+
+ return this;
+ }
+
+ @Override public Builder setDataLen(int remaining) {
+ this.dataLen = remaining;
+
+ return this;
+ }
+
+ @Override public Builder addPeers(String peerId) {
+ peersList.add(peerId);
+
+ return this;
+ }
+
+ @Override public Builder addOldPeers(String oldPeerId) {
+ oldPeersList.add(oldPeerId);
+
+ return this;
+ }
+
+ @Override public Builder addLearners(String learnerId) {
+ learnersList.add(learnerId);
+
+ return this;
+ }
+
+ @Override public Builder addOldLearners(String oldLearnerId) {
+ oldLearnersList.add(oldLearnerId);
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java
new file mode 100644
index 0000000..58447d0
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java
@@ -0,0 +1,32 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+ private int errorCode;
+ private String errorMsg;
+
+ @Override public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override public Builder setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+
+ return this;
+ }
+
+ @Override public String getErrorMsg() {
+ return errorMsg;
+ }
+
+ @Override public Builder setErrorMsg(String errorMsg) {
+ this.errorMsg = errorMsg;
+
+ return this;
+ }
+
+ @Override public RpcRequests.ErrorResponse build() {
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java
new file mode 100644
index 0000000..480f9a4
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java
@@ -0,0 +1,21 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class PingRequestImpl implements RpcRequests.PingRequest , RpcRequests.PingRequest .Builder {
+ private long sendTimestamp;
+
+ @Override public long getSendTimestamp() {
+ return sendTimestamp;
+ }
+
+ @Override public Builder setSendTimestamp(long timestamp) {
+ this.sendTimestamp = timestamp;
+
+ return this;
+ }
+
+ @Override public RpcRequests.PingRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java
new file mode 100644
index 0000000..d5303c3
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java
@@ -0,0 +1,87 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class PreVoteRequestImpl implements RpcRequests.RequestVoteRequest, RpcRequests.RequestVoteRequest.Builder {
+ private String groupId;
+ private String serverId;
+ private String peerId;
+ private long term;
+ private long lastLogTerm;
+ private long lastLogIndex;
+ private boolean preVote;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public String getServerId() {
+ return serverId;
+ }
+
+ @Override public Builder setServerId(String serverId) {
+ this.serverId = serverId;
+
+ return this;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ @Override public Builder setLastLogTerm(long lastLogTerm) {
+ this.lastLogTerm = lastLogTerm;
+
+ return this;
+ }
+
+ @Override public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ @Override public Builder setLastLogIndex(long lastLogIndex) {
+ this.lastLogIndex = lastLogIndex;
+
+ return this;
+ }
+
+ public boolean getPreVote() {
+ return preVote;
+ }
+
+ @Override public Builder setPreVote(boolean preVote) {
+ this.preVote = preVote;
+
+ return this;
+ }
+
+ @Override public RpcRequests.RequestVoteRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java
new file mode 100644
index 0000000..dd1868e
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java
@@ -0,0 +1,76 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+import com.alipay.sofa.jraft.util.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+
+class ReadIndexRequestImpl implements RpcRequests.ReadIndexRequest, RpcRequests.ReadIndexRequest.Builder {
+ private String groupId;
+ private String serverId;
+ private List<ByteString> entriesList = new ArrayList<>();
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getServerId() {
+ return serverId;
+ }
+
+ @Override public List<ByteString> getEntriesList() {
+ return entriesList;
+ }
+
+ @Override public int getEntriesCount() {
+ return entriesList.size();
+ }
+
+ @Override public ByteString getEntries(int index) {
+ return entriesList.get(index);
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public RpcRequests.ReadIndexRequest build() {
+ return this;
+ }
+
+ @Override public Builder mergeFrom(RpcRequests.ReadIndexRequest request) {
+ setGroupId(request.getGroupId());
+ setServerId(request.getServerId());
+ setPeerId(request.getPeerId());
+ for (ByteString data : request.getEntriesList()) {
+ addEntries(data);
+ }
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setServerId(String serverId) {
+ this.serverId = serverId;
+
+ return this;
+ }
+
+ @Override public Builder addEntries(ByteString data) {
+ entriesList.add(data);
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java
new file mode 100644
index 0000000..6cc658f
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class ReadIndexResponseImpl implements RpcRequests.ReadIndexResponse, RpcRequests.ReadIndexResponse.Builder {
+ private long index;
+ private boolean success;
+
+ @Override public long getIndex() {
+ return index;
+ }
+
+ @Override public boolean getSuccess() {
+ return success;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public RpcRequests.ReadIndexResponse build() {
+ return this;
+ }
+
+ @Override public Builder setSuccess(boolean success) {
+ this.success = success;
+
+ return this;
+ }
+
+ @Override public Builder setIndex(long lastCommittedIndex) {
+ this.index = index;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java
new file mode 100644
index 0000000..e628940
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class RequestVoteResponseImpl implements RpcRequests.RequestVoteResponse, RpcRequests.RequestVoteResponse.Builder {
+ private long term;
+ private boolean granted;
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public boolean getGranted() {
+ return granted;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public RpcRequests.RequestVoteResponse build() {
+ return this;
+ }
+
+ @Override public Builder setTerm(long currTerm) {
+ this.term = currTerm;
+
+ return this;
+ }
+
+ @Override public Builder setGranted(boolean granted) {
+ this.granted = granted;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java
new file mode 100644
index 0000000..e7790bb
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java
@@ -0,0 +1,32 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+
+class StableMeta implements LocalStorageOutter.StablePBMeta, LocalStorageOutter.StablePBMeta.Builder {
+ private long term;
+ private String votedFor;
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public String getVotedfor() {
+ return votedFor;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setVotedfor(String votedFor) {
+ this.votedFor = votedFor;
+
+ return this;
+ }
+
+ @Override public LocalStorageOutter.StablePBMeta build() {
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
new file mode 100644
index 0000000..af0be57
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
@@ -0,0 +1,54 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class TimeoutNowRequestImpl implements RpcRequests.TimeoutNowRequest, RpcRequests.TimeoutNowRequest.Builder {
+ private String groupId;
+ private String serverId;
+ private String peerId;
+ private long term;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getServerId() {
+ return serverId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public RpcRequests.TimeoutNowRequest build() {
+ return this;
+ }
+
+ @Override public Builder setTerm(long term) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setServerId(String serverId) {
+ this.serverId = serverId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
new file mode 100644
index 0000000..da441ff
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcRequests.TimeoutNowResponse.Builder {
+ private long term;
+ private boolean success;
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public boolean getSuccess() {
+ return success;
+ }
+
+ @Override public RpcRequests.ErrorResponse getErrorResponse() {
+ return null;
+ }
+
+ @Override public RpcRequests.TimeoutNowResponse build() {
+ return this;
+ }
+
+ @Override public Builder setTerm(long currTerm) {
+ this.term = term;
+
+ return this;
+ }
+
+ @Override public Builder setSuccess(boolean success) {
+ this.success = success;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
index cf9bdce..25c06d7 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
@@ -6,26 +6,34 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-/** */
+/**
+ *
+ */
public class JDKMarshaller implements Marshaller {
- /** {@inheritDoc} */
- @Override public byte[] marshall(Object o) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(o);
- oos.close();
-
- return baos.toByteArray();
+ /**
+ * {@inheritDoc}
+ */
+ @Override public byte[] marshall(Object o) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(o);
+ oos.close();
+ return baos.toByteArray();
+ } catch (Exception e) {
+ throw new Error(e);
+ }
}
- /** {@inheritDoc} */
- @Override public Object unmarshall(byte[] raw) throws IOException{
- ByteArrayInputStream bais = new ByteArrayInputStream(raw);
- ObjectInputStream oos = new ObjectInputStream(bais);
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override public <T> T unmarshall(byte[] raw) {
try {
- return oos.readObject();
- } catch (ClassNotFoundException e) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(raw);
+ ObjectInputStream oos = new ObjectInputStream(bais);
+ return (T) oos.readObject();
+ } catch (Exception e) {
throw new Error(e);
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
index 6f28493..54b07ed 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
@@ -5,7 +5,7 @@ import java.io.IOException;
public interface Marshaller {
public static Marshaller DEFAULT = new JDKMarshaller();
- byte[] marshall(Object o) throws IOException;
+ byte[] marshall(Object o);
- <T> T unmarshall(byte[] raw) throws IOException;
+ <T> T unmarshall(byte[] raw);
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
index 54c18e9..d650a10 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
@@ -381,7 +381,7 @@ public final class Utils {
final boolean isDir = file.isDirectory();
// can't fsync on windowns.
if (isDir && Platform.isWindows()) {
- LOG.warn("Unable to fsync directory {} on windows.", file);
+ // LOG.warn("Unable to fsync directory {} on windows.", file);
return;
}
try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ
diff --git a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
index 8416bc1..54429b7 100644
--- a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
+++ b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
@@ -1 +1 @@
-com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory
\ No newline at end of file
+com.alipay.sofa.jraft.rpc.impl.LocalRaftRpcFactory
\ No newline at end of file
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
new file mode 100644
index 0000000..f34ebed
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -0,0 +1,154 @@
+package com.alipay.sofa.jraft.rpc;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO add test for localconn.close, timeouts.
+ */
+public class LocalRpcTest {
+ private Endpoint endpoint;
+ private LocalRpcServer server;
+
+ @Before
+ public void setup() {
+ endpoint = PeerId.parsePeer("localhost:1000").getEndpoint();
+ server = new LocalRpcServer(endpoint);
+ server.registerProcessor(new Request1RpcProcessor());
+ server.registerProcessor(new Request2RpcProcessor());
+ server.init(null);
+ }
+
+ @After
+ public void teardown() {
+ server.shutdown();
+
+ assertNull(LocalRpcServer.servers.get(endpoint));
+ }
+
+ @Test
+ public void testStartStopServer() {
+ assertNotNull(LocalRpcServer.servers.get(endpoint));
+ }
+
+ @Test
+ public void testConnection() {
+ LocalRpcClient client = new LocalRpcClient();
+
+ assertFalse(client.checkConnection(endpoint));
+
+ assertTrue(client.checkConnection(endpoint, true));
+ }
+
+ @Test
+ public void testSyncProcessing() throws RemotingException, InterruptedException {
+ RpcClient client = new LocalRpcClient();
+ Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
+ assertNotNull(resp1);
+
+ Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
+ assertNotNull(resp2);
+ }
+
+ @Test
+ public void testAsyncProcessing() throws RemotingException, InterruptedException {
+ RpcClient client = new LocalRpcClient();
+
+ CountDownLatch l1 = new CountDownLatch(1);
+ AtomicReference<Response1> resp1 = new AtomicReference<>();
+ client.invokeAsync(endpoint, new Request1(), new InvokeContext(), (result, err) -> {
+ resp1.set((Response1) result);
+ l1.countDown();
+ }, 5000);
+ l1.await(5_000, TimeUnit.MILLISECONDS);
+ assertNotNull(resp1);
+
+ CountDownLatch l2 = new CountDownLatch(1);
+ AtomicReference<Response2> resp2 = new AtomicReference<>();
+ client.invokeAsync(endpoint, new Request2(), new InvokeContext(), (result, err) -> {
+ resp2.set((Response2) result);
+ l2.countDown();
+ }, 5000);
+ l2.await(5_000, TimeUnit.MILLISECONDS);
+ assertNotNull(resp2);
+ }
+
+ @Test
+ public void testDisconnect1() {
+ RpcClient client1 = new LocalRpcClient();
+ RpcClient client2 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+ assertTrue(client2.checkConnection(endpoint, true));
+
+ client1.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertTrue(client2.checkConnection(endpoint));
+
+ client2.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertFalse(client2.checkConnection(endpoint));
+ }
+
+ @Test
+ public void testDisconnect2() {
+ RpcClient client1 = new LocalRpcClient();
+ RpcClient client2 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+ assertTrue(client2.checkConnection(endpoint, true));
+
+ server.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertFalse(client2.checkConnection(endpoint));
+ }
+
+ private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+ @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
+ rpcCtx.sendResponse(new Response1());
+ }
+
+ @Override public String interest() {
+ return Request1.class.getName();
+ }
+ }
+
+ private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+ @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
+ rpcCtx.sendResponse(new Response2());
+ }
+
+ @Override public String interest() {
+ return Request2.class.getName();
+ }
+ }
+
+ private static class Request1 implements Message {
+ }
+
+ private static class Request2 implements Message {
+ }
+
+ private static class Response1 implements Message {
+ }
+
+ private static class Response2 implements Message {
+ }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
index 72b2638..e890721 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
@@ -26,9 +26,4 @@ public class LocalLogStorageTest extends BaseLogStorageTest {
protected LogStorage newLogStorage() {
return new LocalLogStorage(this.path, new RaftOptions());
}
-
- @Test
- @Override public void testEmptyState() {
- super.testEmptyState();
- }
}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
index d4bcc66..2639ea8 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
@@ -19,7 +19,6 @@ package com.alipay.sofa.jraft.storage.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import com.alipay.sofa.jraft.util.Utils;
import java.io.File;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
diff --git a/modules/raft/src/test/resources/log4j2.xml b/modules/raft/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..90d27d7
--- /dev/null
+++ b/modules/raft/src/test/resources/log4j2.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
+ </Console>
+
+ <!--<RollingFile name="RollingFile" filename="log/jraft-example.log"-->
+ <!--filepattern="log/%d{YYYYMMddHHmmss}-jraft-example.log">-->
+ <!--<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>-->
+ <!--<Policies>-->
+ <!--<SizeBasedTriggeringPolicy size="100 MB"/>-->
+ <!--</Policies>-->
+ <!--<DefaultRolloverStrategy max="20"/>-->
+ <!--</RollingFile>-->
+
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ <AppenderRef ref="RollingFile"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file