You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/05/19 18:23:42 UTC
[2/2] incubator-ratis git commit: RATIS-86. Support raft server
re-initialization. Contributed by Tsz Wo Nicholas Sze.
RATIS-86. Support raft server re-initialization. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/06002e67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/06002e67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/06002e67
Branch: refs/heads/master
Commit: 06002e67a3476a491e4fc0f1638123f5957c452e
Parents: 291f51b
Author: Jing Zhao <ji...@apache.org>
Authored: Fri May 19 11:23:33 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri May 19 11:23:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 3 +
.../ratis/client/impl/ClientProtoUtils.java | 22 +++++
.../ratis/client/impl/RaftClientImpl.java | 22 ++++-
.../protocol/AdminAsynchronousProtocol.java | 27 +++++++
.../apache/ratis/protocol/AdminProtocol.java | 25 ++++++
.../ratis/protocol/ReinitializeRequest.java | 39 +++++++++
.../org/apache/ratis/util/CheckedSupplier.java | 30 +++++++
.../java/org/apache/ratis/util/IOUtils.java | 4 +-
.../java/org/apache/ratis/util/JavaUtils.java | 63 +++++++++++++++
.../org/apache/ratis/grpc/RaftGRpcService.java | 2 +
.../org/apache/ratis/grpc/RaftGrpcUtil.java | 32 ++++++--
.../apache/ratis/grpc/client/GrpcClientRpc.java | 7 +-
.../grpc/client/RaftClientProtocolClient.java | 26 ++++--
.../grpc/client/RaftClientProtocolService.java | 27 ++-----
.../ratis/grpc/server/AdminProtocolService.java | 45 +++++++++++
.../grpc/TestReinitializationWithGrpc.java | 28 +++++++
.../apache/ratis/hadooprpc/HadoopConstants.java | 4 +-
.../client/CombinedClientProtocol.java | 25 ++++++
...nedClientProtocolClientSideTranslatorPB.java | 83 +++++++++++++++++++
.../client/CombinedClientProtocolPB.java | 37 +++++++++
...nedClientProtocolServerSideTranslatorPB.java | 82 +++++++++++++++++++
.../ratis/hadooprpc/client/HadoopClientRpc.java | 10 ++-
...aftClientProtocolClientSideTranslatorPB.java | 70 ----------------
.../hadooprpc/client/RaftClientProtocolPB.java | 37 ---------
...aftClientProtocolServerSideTranslatorPB.java | 69 ----------------
.../hadooprpc/server/HadoopRpcService.java | 18 ++---
.../TestReinitializationWithHadoopRpc.java | 28 +++++++
.../ratis/netty/client/NettyClientRpc.java | 8 +-
.../ratis/netty/server/NettyRpcService.java | 9 +++
.../netty/TestReinitializationWithNetty.java | 28 +++++++
ratis-proto-shaded/src/main/proto/GRpc.proto | 6 ++
ratis-proto-shaded/src/main/proto/Hadoop.proto | 6 +-
ratis-proto-shaded/src/main/proto/Netty.proto | 1 +
ratis-proto-shaded/src/main/proto/Raft.proto | 6 ++
.../org/apache/ratis/server/RaftServer.java | 8 +-
.../apache/ratis/server/impl/LeaderState.java | 3 +-
.../ratis/server/impl/PeerConfiguration.java | 1 -
.../ratis/server/impl/RaftServerImpl.java | 2 +-
.../ratis/server/impl/RaftServerProxy.java | 85 +++++++++++++++++---
.../java/org/apache/ratis/MiniRaftCluster.java | 24 +++---
.../java/org/apache/ratis/RaftBasicTests.java | 2 +-
.../java/org/apache/ratis/RaftTestUtil.java | 23 +++++-
.../impl/RaftReconfigurationBaseTest.java | 2 +-
.../server/impl/ReinitializationBaseTest.java | 84 +++++++++++++++++++
.../ratis/server/simulation/SimulatedRpc.java | 1 -
.../server/simulation/SimulatedServerRpc.java | 27 +++----
.../TestReinitializationWithSimulatedRpc.java | 28 +++++++
47 files changed, 938 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 8f3c465..956a6de 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -52,6 +52,9 @@ public interface RaftClient extends Closeable {
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
+ /** Send reinitialize request to the service. */
+ RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException;
+
/** @return a {@link Builder}. */
static Builder newBuilder() {
return new Builder();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 62a9ee4..ff49109 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -185,4 +185,26 @@ public class ClientProtoUtils {
Arrays.asList(request.getPeersInNewConf())))
.build();
}
+
+ public static ReinitializeRequest toReinitializeRequest(
+ ReinitializeRequestProto p) {
+ final RaftRpcRequestProto m = p.getRpcRequest();
+ final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
+ return new ReinitializeRequest(
+ new ClientId(m.getRequestorId().toByteArray()),
+ RaftPeerId.valueOf(m.getReplyId()),
+ p.getRpcRequest().getCallId(), peers);
+ }
+
+ public static ReinitializeRequestProto toReinitializeRequestProto(
+ ReinitializeRequest request) {
+ return ReinitializeRequestProto.newBuilder()
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(
+ request.getClientId().toBytes(),
+ request.getServerId().toBytes(),
+ request.getCallId()))
+ .addAllPeers(ProtoUtils.toRaftPeerProtos(
+ Arrays.asList(request.getPeersInNewConf())))
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 2125ce0..40e670d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
/** A client who sends requests to a raft service. */
@@ -86,12 +87,25 @@ final class RaftClientImpl implements RaftClient {
throws IOException {
final long callId = nextCallId();
// also refresh the rpc proxies for these peers
- clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
- .collect(Collectors.toCollection(ArrayList::new)));
+ addServers(peersInNewConf);
return sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, callId, peersInNewConf));
}
+ @Override
+ public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server)
+ throws IOException {
+ final long callId = nextCallId();
+ addServers(peersInNewConf);
+ return sendRequest(new ReinitializeRequest(
+ clientId, server, callId, peersInNewConf));
+ }
+
+ private void addServers(RaftPeer[] peersInNewConf) {
+ clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
+ .collect(Collectors.toList()));
+ }
+
private RaftClientReply sendRequestWithRetry(
Supplier<RaftClientRequest> supplier)
throws InterruptedIOException, StateMachineException {
@@ -157,6 +171,10 @@ final class RaftClientImpl implements RaftClient {
RaftPeerId newLeader) {
LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId,
newLeader, ioe);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stack trace", new Throwable("TRACE"));
+ }
+
final RaftPeerId oldLeader = request.getServerId();
if (newLeader == null && oldLeader.equals(leaderId)) {
newLeader = CollectionUtils.next(oldLeader, CollectionUtils.as(peers, RaftPeer::getId));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
new file mode 100644
index 0000000..663751b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link AdminProtocol}. */
+public interface AdminAsynchronousProtocol {
+ CompletableFuture<RaftClientReply> reinitializeAsync(
+ ReinitializeRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
new file mode 100644
index 0000000..0e7d6b6
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/** For server administration. */
+public interface AdminProtocol {
+ RaftClientReply reinitialize(ReinitializeRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
new file mode 100644
index 0000000..0a89340
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.util.Arrays;
+
+public class ReinitializeRequest extends RaftClientRequest {
+ private final RaftPeer[] peers;
+
+ public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
+ long callId, RaftPeer[] peers) {
+ super(clientId, serverId, callId, null);
+ this.peers = peers;
+ }
+
+ public RaftPeer[] getPeersInNewConf() {
+ return peers;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
new file mode 100644
index 0000000..0c9de31
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.function.Supplier;
+
+/** Function with a throws-clause. */
+@FunctionalInterface
+public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
+ /**
+ * The same as {@link Supplier#get()}
+ * except that this method is declared with a throws-clause.
+ */
+ OUTPUT get() throws THROWABLE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index ba5e78e..4976be8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -43,7 +43,9 @@ public interface IOUtils {
}
static IOException asIOException(Throwable t) {
- return t instanceof IOException? (IOException)t : new IOException(t);
+ return t == null? null
+ : t instanceof IOException? (IOException)t
+ : new IOException(t);
}
static IOException toIOException(ExecutionException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
new file mode 100644
index 0000000..5da2012
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -0,0 +1,63 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * General Java utility methods.
+ */
+public interface JavaUtils {
+ Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
+
+ /**
+ * Invoke {@link Callable#call()} and, if there any,
+ * wrap the checked exception by {@link RuntimeException}.
+ */
+ static <T> T callAsUnchecked(Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Get the value from the future and then consume it.
+ */
+ static <T> void getAndConsume(CompletableFuture<T> future, Consumer<T> consumer) {
+ final T t;
+ try {
+ t = future.get();
+ } catch (Exception ignored) {
+ LOG.warn("Failed to get()", ignored);
+ return;
+ }
+ consumer.accept(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 0deb3f4..96a7a45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -18,6 +18,7 @@
package org.apache.ratis.grpc;
import org.apache.ratis.grpc.client.RaftClientProtocolService;
+import org.apache.ratis.grpc.server.AdminProtocolService;
import org.apache.ratis.grpc.server.RaftServerProtocolClient;
import org.apache.ratis.grpc.server.RaftServerProtocolService;
import org.apache.ratis.protocol.RaftPeer;
@@ -82,6 +83,7 @@ public class RaftGRpcService implements RaftServerRpc {
server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
.addService(new RaftServerProtocolService(selfId, raftServer))
.addService(new RaftClientProtocolService(selfId, raftServer))
+ .addService(new AdminProtocolService(selfId, raftServer))
.build();
// start service to determine the port (in case port is configured as 0)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index b89c297..fdc9ce8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -17,20 +17,26 @@
*/
package org.apache.ratis.grpc;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.shaded.io.grpc.Metadata;
import org.apache.ratis.shaded.io.grpc.Status;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.util.CheckedSupplier;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.StringUtils;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
-public class RaftGrpcUtil {
- public static final Metadata.Key<String> EXCEPTION_TYPE_KEY =
+public interface RaftGrpcUtil {
+ Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
- public static StatusRuntimeException wrapException(Throwable t) {
+ static StatusRuntimeException wrapException(Throwable t) {
Metadata trailers = new Metadata();
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
return new StatusRuntimeException(
@@ -38,7 +44,7 @@ public class RaftGrpcUtil {
trailers);
}
- public static IOException unwrapException(StatusRuntimeException se) {
+ static IOException unwrapException(StatusRuntimeException se) {
final Metadata trailers = se.getTrailers();
final Status status = se.getStatus();
if (trailers != null && status != null) {
@@ -57,7 +63,7 @@ public class RaftGrpcUtil {
return new IOException(se);
}
- public static IOException unwrapIOException(Throwable t) {
+ static IOException unwrapIOException(Throwable t) {
final IOException e;
if (t instanceof StatusRuntimeException) {
e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
@@ -67,4 +73,20 @@ public class RaftGrpcUtil {
return e;
}
+ static void asyncCall(
+ StreamObserver<RaftClientReplyProto> responseObserver,
+ CheckedSupplier<CompletableFuture<RaftClientReply>, IOException> supplier) {
+ try {
+ supplier.get().whenCompleteAsync((reply, exception) -> {
+ if (exception != null) {
+ responseObserver.onError(RaftGrpcUtil.wrapException(exception));
+ } else {
+ responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
+ responseObserver.onCompleted();
+ }
+ });
+ } catch (Exception e) {
+ responseObserver.onError(RaftGrpcUtil.wrapException(e));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index b28415c..b30640b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -22,6 +22,7 @@ import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
@@ -48,7 +49,11 @@ public class GrpcClientRpc implements RaftClientRpc {
throws IOException {
final RaftPeerId serverId = request.getServerId();
final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
- if (request instanceof SetConfigurationRequest) {
+ if (request instanceof ReinitializeRequest) {
+ RaftProtos.ReinitializeRequestProto proto =
+ toReinitializeRequestProto((ReinitializeRequest) request);
+ return toRaftClientReply(proxy.reinitialize(proto));
+ } else if (request instanceof SetConfigurationRequest) {
SetConfigurationRequestProto setConf =
toSetConfigurationRequestProto((SetConfigurationRequest) request);
return toRaftClientReply(proxy.setConfiguration(setConf));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 74fb253..21254f3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,18 +17,22 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.CheckedSupplier;
import java.io.Closeable;
import java.io.IOException;
@@ -38,6 +42,7 @@ public class RaftClientProtocolClient implements Closeable {
private final ManagedChannel channel;
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
+ private final AdminProtocolServiceBlockingStub adminBlockingStub;
public RaftClientProtocolClient(RaftPeer target) {
this.target = target;
@@ -45,6 +50,7 @@ public class RaftClientProtocolClient implements Closeable {
.usePlaintext(true).build();
blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
+ adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
}
@Override
@@ -52,12 +58,22 @@ public class RaftClientProtocolClient implements Closeable {
channel.shutdownNow();
}
- public RaftClientReplyProto setConfiguration(
+ RaftClientReplyProto reinitialize(
+ ReinitializeRequestProto request) throws IOException {
+ return blockingCall(() -> adminBlockingStub.reinitialize(request));
+ }
+
+ RaftClientReplyProto setConfiguration(
SetConfigurationRequestProto request) throws IOException {
+ return blockingCall(() -> blockingStub.setConfiguration(request));
+ }
+
+ private static RaftClientReplyProto blockingCall(
+ CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
+ ) throws IOException {
try {
- return blockingStub.setConfiguration(request);
+ return supplier.get();
} catch (StatusRuntimeException e) {
- // unwrap StatusRuntimeException
throw RaftGrpcUtil.unwrapException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 97e32c1..e11a9cf 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -17,16 +17,17 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,22 +75,10 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
}
@Override
- public void setConfiguration(SetConfigurationRequestProto request,
+ public void setConfiguration(SetConfigurationRequestProto proto,
StreamObserver<RaftClientReplyProto> responseObserver) {
- try {
- CompletableFuture<RaftClientReply> future = protocol.setConfigurationAsync(
- ClientProtoUtils.toSetConfigurationRequest(request));
- future.whenCompleteAsync((reply, exception) -> {
- if (exception != null) {
- responseObserver.onError(RaftGrpcUtil.wrapException(exception));
- } else {
- responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
- responseObserver.onCompleted();
- }
- });
- } catch (Exception e) {
- responseObserver.onError(RaftGrpcUtil.wrapException(e));
- }
+ final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+ RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
new file mode 100644
index 0000000..4e7ff9a
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
+
+public class AdminProtocolService extends AdminProtocolServiceImplBase {
+ private final RaftPeerId id;
+ private final AdminAsynchronousProtocol protocol;
+
+ public AdminProtocolService(RaftPeerId id, AdminAsynchronousProtocol protocol) {
+ this.id = id;
+ this.protocol = protocol;
+ }
+
+ @Override
+ public void reinitialize(ReinitializeRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final ReinitializeRequest request = ClientProtoUtils.toReinitializeRequest(proto);
+ RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.reinitializeAsync(request));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
new file mode 100644
index 0000000..27cbf1e
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithGrpc extends ReinitializationBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithGRpc.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
index a50b938..1f9c15d 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
@@ -24,6 +24,6 @@ public interface HadoopConstants {
= "raft.client.kerberos.principal";
String RAFT_SERVER_PROTOCOL_NAME
= "org.apache.hadoop.raft.server.protocol.RaftServerProtocol";
- String RAFT_CLIENT_PROTOCOL_NAME
- = "org.apache.hadoop.raft.protocol.RaftClientProtocol";
+ String COMBINED_CLIENT_PROTOCOL_NAME
+ = "org.apache.ratis.hadooprpc.client.CombinedClientProtocol";
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
new file mode 100644
index 0000000..6281987
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.ratis.protocol.AdminProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+
+public interface CombinedClientProtocol
+ extends RaftClientProtocol, AdminProtocol {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..8d1eff2
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.hadooprpc.Proxy;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.shaded.com.google.common.base.Function;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+@InterfaceAudience.Private
+public class CombinedClientProtocolClientSideTranslatorPB
+ extends Proxy<CombinedClientProtocolPB>
+ implements CombinedClientProtocol {
+ private static final Logger LOG = LoggerFactory.getLogger(CombinedClientProtocolClientSideTranslatorPB.class);
+
+ public CombinedClientProtocolClientSideTranslatorPB(
+ String addressStr, Configuration conf) throws IOException {
+ super(CombinedClientProtocolPB.class, addressStr, conf);
+ }
+
+ @Override
+ public RaftClientReply submitClientRequest(RaftClientRequest request)
+ throws IOException {
+ return handleRequest(request, ClientProtoUtils::toRaftClientRequestProto,
+ p -> getProtocol().submitClientRequest(null, p));
+ }
+
+ @Override
+ public RaftClientReply setConfiguration(SetConfigurationRequest request)
+ throws IOException {
+ return handleRequest(request, ClientProtoUtils::toSetConfigurationRequestProto,
+ p -> getProtocol().setConfiguration(null, p));
+ }
+
+ @Override
+ public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
+ return handleRequest(request, ClientProtoUtils::toReinitializeRequestProto,
+ p -> getProtocol().reinitialize(null, p));
+ }
+
+ static <REQUEST extends RaftClientRequest, PROTO> RaftClientReply handleRequest(
+ REQUEST request, Function<REQUEST, PROTO> toProto,
+ CheckedFunction<PROTO, RaftClientReplyProto, ServiceException> handler)
+ throws IOException {
+ final PROTO proto = toProto.apply(request);
+ try {
+ final RaftClientReplyProto reply = handler.apply(proto);
+ return ClientProtoUtils.toRaftClientReply(reply);
+ } catch (ServiceException se) {
+ LOG.trace("Failed to handle " + request, se);
+ throw ProtoUtils.toIOException(se);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
new file mode 100644
index 0000000..e9af3b0
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.ratis.hadooprpc.HadoopConstants;
+import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@KerberosInfo(
+ serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
+ clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+ protocolName = HadoopConstants.COMBINED_CLIENT_PROTOCOL_NAME,
+ protocolVersion = 1)
+public interface CombinedClientProtocolPB extends
+ CombinedClientProtocolService.BlockingInterface {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..ef9e733
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.shaded.com.google.protobuf.RpcController;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+
+@InterfaceAudience.Private
+public class CombinedClientProtocolServerSideTranslatorPB
+ implements CombinedClientProtocolPB {
+ private final RaftServer impl;
+
+ public CombinedClientProtocolServerSideTranslatorPB(RaftServer impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public RaftClientReplyProto submitClientRequest(
+ RpcController unused, RaftClientRequestProto proto)
+ throws ServiceException {
+ final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto);
+ try {
+ final RaftClientReply reply = impl.submitClientRequest(request);
+ return ClientProtoUtils.toRaftClientReplyProto(reply);
+ } catch(IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ @Override
+ public RaftClientReplyProto setConfiguration(
+ RpcController unused, SetConfigurationRequestProto proto)
+ throws ServiceException {
+ final SetConfigurationRequest request;
+ try {
+ request = ClientProtoUtils.toSetConfigurationRequest(proto);
+ final RaftClientReply reply = impl.setConfiguration(request);
+ return ClientProtoUtils.toRaftClientReplyProto(reply);
+ } catch(IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ @Override
+ public RaftClientReplyProto reinitialize(
+ RpcController controller, ReinitializeRequestProto proto)
+ throws ServiceException {
+ final ReinitializeRequest request;
+ try {
+ request = ClientProtoUtils.toReinitializeRequest(proto);
+ final RaftClientReply reply = impl.reinitialize(request);
+ return ClientProtoUtils.toRaftClientReplyProto(reply);
+ } catch(IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index 25c0ecd..3a2d6fc 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -27,21 +27,23 @@ import java.io.IOException;
public class HadoopClientRpc implements RaftClientRpc {
- private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
+ private final PeerProxyMap<CombinedClientProtocolClientSideTranslatorPB> proxies;
public HadoopClientRpc(final Configuration conf) {
this.proxies = new PeerProxyMap<>(
- p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
+ p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
}
@Override
public RaftClientReply sendRequest(RaftClientRequest request)
throws IOException {
final RaftPeerId serverId = request.getServerId();
- final RaftClientProtocolClientSideTranslatorPB proxy =
+ final CombinedClientProtocolClientSideTranslatorPB proxy =
proxies.getProxy(serverId);
try {
- if (request instanceof SetConfigurationRequest) {
+ if (request instanceof ReinitializeRequest) {
+ return proxy.reinitialize((ReinitializeRequest) request);
+ } else if (request instanceof SetConfigurationRequest) {
return proxy.setConfiguration((SetConfigurationRequest) request);
} else {
return proxy.submitClientRequest(request);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
deleted file mode 100644
index a5c1a13..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.hadooprpc.Proxy;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.util.ProtoUtils;
-
-@InterfaceAudience.Private
-public class RaftClientProtocolClientSideTranslatorPB
- extends Proxy<RaftClientProtocolPB>
- implements RaftClientProtocol {
-
- public RaftClientProtocolClientSideTranslatorPB(
- String addressStr, Configuration conf) throws IOException {
- super(RaftClientProtocolPB.class, addressStr, conf);
- }
-
- @Override
- public RaftClientReply submitClientRequest(RaftClientRequest request)
- throws IOException {
- final RaftClientRequestProto p = ClientProtoUtils.toRaftClientRequestProto(request);
- try {
- final RaftClientReplyProto reply = getProtocol().submitClientRequest(null, p);
- return ClientProtoUtils.toRaftClientReply(reply);
- } catch (ServiceException se) {
- throw ProtoUtils.toIOException(se);
- }
- }
-
- @Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request)
- throws IOException {
- final SetConfigurationRequestProto p
- = ClientProtoUtils.toSetConfigurationRequestProto(request);
- try {
- final RaftClientReplyProto reply = getProtocol().setConfiguration(null, p);
- return ClientProtoUtils.toRaftClientReply(reply);
- } catch (ServiceException se) {
- throw ProtoUtils.toIOException(se);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
deleted file mode 100644
index 908cd99..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.ratis.hadooprpc.HadoopConstants;
-import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@KerberosInfo(
- serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
- clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY)
-@ProtocolInfo(
- protocolName = HadoopConstants.RAFT_CLIENT_PROTOCOL_NAME,
- protocolVersion = 1)
-public interface RaftClientProtocolPB extends
- RaftClientProtocolService.BlockingInterface {
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
deleted file mode 100644
index 08cf589..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.shaded.com.google.protobuf.RpcController;
-import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-
-@InterfaceAudience.Private
-public class RaftClientProtocolServerSideTranslatorPB
- implements RaftClientProtocolPB {
- private final RaftClientProtocol impl;
-
- public RaftClientProtocolServerSideTranslatorPB(RaftClientProtocol impl) {
- this.impl = impl;
- }
-
- @Override
- public RaftClientReplyProto submitClientRequest(
- RpcController unused, RaftClientRequestProto proto)
- throws ServiceException {
- final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto);
- try {
- final RaftClientReply reply = impl.submitClientRequest(request);
- return ClientProtoUtils.toRaftClientReplyProto(reply);
- } catch(IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
- @Override
- public RaftClientReplyProto setConfiguration(
- RpcController unused, SetConfigurationRequestProto proto)
- throws ServiceException {
- final SetConfigurationRequest request;
- try {
- request = ClientProtoUtils.toSetConfigurationRequest(proto);
- final RaftClientReply reply = impl.setConfiguration(request);
- return ClientProtoUtils.toRaftClientReplyProto(reply);
- } catch(IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index e31a03a..15afde8 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -22,9 +22,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
import org.apache.hadoop.ipc.RPC;
import org.apache.ratis.hadooprpc.HadoopConfigKeys;
import org.apache.ratis.hadooprpc.Proxy;
-import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
-import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
-import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.hadooprpc.client.CombinedClientProtocolPB;
+import org.apache.ratis.hadooprpc.client.CombinedClientProtocolServerSideTranslatorPB;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -35,7 +34,7 @@ import org.apache.ratis.shaded.com.google.protobuf.BlockingService;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
+import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService;
import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService;
import org.apache.ratis.util.CheckedFunction;
import org.apache.ratis.util.CodeInjectionForTesting;
@@ -138,13 +137,12 @@ public class HadoopRpcService implements RaftServerRpc {
.build();
}
- private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) {
- final Class<?> protocol = RaftClientProtocolPB.class;
- RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class);
+ private void addRaftClientProtocol(RaftServer server, Configuration conf) {
+ final Class<?> protocol = CombinedClientProtocolPB.class;
+ RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngineShaded.class);
- final BlockingService service
- = RaftClientProtocolService.newReflectiveBlockingService(
- new RaftClientProtocolServerSideTranslatorPB(clientProtocol));
+ final BlockingService service = CombinedClientProtocolService.newReflectiveBlockingService(
+ new CombinedClientProtocolServerSideTranslatorPB(server));
ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
new file mode 100644
index 0000000..6efb012
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithHadoopRpc extends ReinitializationBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithHadoopRpc.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 74afddc..14218ad 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.netty.NettyRpcProxy;
import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
@@ -38,7 +39,12 @@ public class NettyClientRpc implements RaftClientRpc {
final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder();
final RaftRpcRequestProto rpcRequest;
- if (request instanceof SetConfigurationRequest) {
+ if (request instanceof ReinitializeRequest) {
+ final ReinitializeRequestProto proto = ClientProtoUtils.toReinitializeRequestProto(
+ (ReinitializeRequest)request);
+ b.setReinitializeRequest(proto);
+ rpcRequest = proto.getRpcRequest();
+ } else if (request instanceof SetConfigurationRequest) {
final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto(
(SetConfigurationRequest)request);
b.setSetConfigurationRequest(proto);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 9504241..b8028b6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -200,6 +200,15 @@ public final class NettyRpcService implements RaftServerRpc {
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
.build();
}
+ case REINITIALIZEREQUEST: {
+ final ReinitializeRequestProto request = proto.getReinitializeRequest();
+ rpcRequest = request.getRpcRequest();
+ final RaftClientReply reply = server.reinitialize(
+ ClientProtoUtils.toReinitializeRequest(request));
+ return RaftNettyServerReplyProto.newBuilder()
+ .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
+ .build();
+ }
case RAFTNETTYSERVERREQUEST_NOT_SET:
throw new IllegalArgumentException("Request case not set in proto: "
+ proto.getRaftNettyServerRequestCase());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
new file mode 100644
index 0000000..c378749
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithNetty extends ReinitializationBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithNetty.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/GRpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto
index 267f579..599227b 100644
--- a/ratis-proto-shaded/src/main/proto/GRpc.proto
+++ b/ratis-proto-shaded/src/main/proto/GRpc.proto
@@ -43,3 +43,9 @@ service RaftServerProtocolService {
rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
returns(ratis.common.InstallSnapshotReplyProto) {}
}
+
+service AdminProtocolService {
+ // A client-to-server RPC to reinitialize the server
+ rpc reinitialize(ratis.common.ReinitializeRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Hadoop.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Hadoop.proto b/ratis-proto-shaded/src/main/proto/Hadoop.proto
index b85b9a2..48cfbf4 100644
--- a/ratis-proto-shaded/src/main/proto/Hadoop.proto
+++ b/ratis-proto-shaded/src/main/proto/Hadoop.proto
@@ -24,12 +24,15 @@ package ratis.hadoop;
import "Raft.proto";
-service RaftClientProtocolService {
+service CombinedClientProtocolService {
rpc submitClientRequest(ratis.common.RaftClientRequestProto)
returns(ratis.common.RaftClientReplyProto);
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto);
+
+ rpc reinitialize(ratis.common.ReinitializeRequestProto)
+ returns(ratis.common.RaftClientReplyProto);
}
service RaftServerProtocolService {
@@ -42,3 +45,4 @@ service RaftServerProtocolService {
rpc installSnapshot(ratis.common.InstallSnapshotRequestProto)
returns(ratis.common.InstallSnapshotReplyProto);
}
+
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Netty.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Netty.proto b/ratis-proto-shaded/src/main/proto/Netty.proto
index d1634d7..4de770f 100644
--- a/ratis-proto-shaded/src/main/proto/Netty.proto
+++ b/ratis-proto-shaded/src/main/proto/Netty.proto
@@ -35,6 +35,7 @@ message RaftNettyServerRequestProto {
ratis.common.InstallSnapshotRequestProto installSnapshotRequest = 3;
ratis.common.RaftClientRequestProto raftClientRequest = 4;
ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5;
+ ratis.common.ReinitializeRequestProto reinitializeRequest = 6;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index f8dcf62..b53181f 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -180,3 +180,9 @@ message SetConfigurationRequestProto {
RaftRpcRequestProto rpcRequest = 1;
repeated RaftPeerProto peers = 2;
}
+
+// reinitialize request
+message ReinitializeRequestProto {
+ RaftRpcRequestProto rpcRequest = 1;
+ repeated RaftPeerProto peers = 2;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index dbd32b7..0899dd1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -19,10 +19,7 @@ package org.apache.ratis.server;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.server.impl.ServerImplUtils;
@@ -35,7 +32,8 @@ import java.util.Objects;
/** Raft server interface */
public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
- RaftClientProtocol, RaftClientAsynchronousProtocol {
+ RaftClientProtocol, RaftClientAsynchronousProtocol,
+ AdminProtocol, AdminAsynchronousProtocol {
/** @return the server ID. */
RaftPeerId getId();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 4c19b7e..e9784bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -448,8 +448,7 @@ public class LeaderState {
}
// the pending request handler will send NotLeaderException for
// pending client requests when it stops
- // TODO should close impl instead of proxy
- server.getProxy().close();
+ server.shutdown();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 82f546b..06aeb62 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -38,7 +38,6 @@ class PeerConfiguration {
map.put(p.getId(), p);
}
this.peers = Collections.unmodifiableMap(map);
- Preconditions.assertTrue(!this.peers.isEmpty());
}
Collection<RaftPeer> getPeers() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d62b207..9685fbc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -445,7 +445,7 @@ public class RaftServerImpl implements RaftServerProtocol,
return waitForReply(getId(), request, submitClientRequestAsync(request));
}
- private static RaftClientReply waitForReply(RaftPeerId id,
+ static RaftClientReply waitForReply(RaftPeerId id,
RaftClientRequest request, CompletableFuture<RaftClientReply> future)
throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 36adf11..5afb737 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -26,23 +26,29 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
public class RaftServerProxy implements RaftServer {
public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class);
private final RaftPeerId id;
- private final RaftServerImpl impl;
private final StateMachine stateMachine;
private final RaftProperties properties;
private final RaftServerRpc serverRpc;
private final ServerFactory factory;
+ private volatile CompletableFuture<RaftServerImpl> impl;
+ private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
+
RaftServerProxy(RaftPeerId id, StateMachine stateMachine,
RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
throws IOException {
@@ -52,10 +58,15 @@ public class RaftServerProxy implements RaftServer {
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
- this.impl = new RaftServerImpl(id, this, raftConf, properties);
+
+ this.impl = CompletableFuture.completedFuture(initImpl(raftConf));
this.serverRpc = initRaftServerRpc(factory, this, raftConf);
}
+ private RaftServerImpl initImpl(RaftConfiguration raftConf) throws IOException {
+ return new RaftServerImpl(id, this, raftConf, properties);
+ }
+
private static RaftServerRpc initRaftServerRpc(
ServerFactory factory, RaftServer server, RaftConfiguration raftConf) {
final RaftServerRpc rpc = factory.newRaftServerRpc(server);
@@ -67,6 +78,11 @@ public class RaftServerProxy implements RaftServer {
}
@Override
+ public RaftPeerId getId() {
+ return id;
+ }
+
+ @Override
public RpcType getRpcType() {
return getFactory().getRpcType();
}
@@ -90,24 +106,25 @@ public class RaftServerProxy implements RaftServer {
return serverRpc;
}
- public RaftServerImpl getImpl() {
- return impl;
+ public RaftServerImpl getImpl() throws IOException {
+ try {
+ return impl.get();
+ } catch (InterruptedException e) {
+ throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e);
+ } catch (ExecutionException e) {
+ throw IOUtils.asIOException(e);
+ }
}
@Override
public void start() {
- getImpl().start();
+ JavaUtils.getAndConsume(impl, RaftServerImpl::start);
getServerRpc().start();
}
@Override
- public RaftPeerId getId() {
- return id;
- }
-
- @Override
public void close() {
- getImpl().shutdown();
+ JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown);
try {
getServerRpc().close();
} catch (IOException ignored) {
@@ -133,6 +150,46 @@ public class RaftServerProxy implements RaftServer {
return getImpl().setConfiguration(request);
}
+ @Override
+ public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
+ return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request));
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> reinitializeAsync(
+ ReinitializeRequest request) throws IOException {
+ if (!reinitializeRequest.compareAndSet(null, request)) {
+ throw new IOException("Another reinitialize is already in progress.");
+ }
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ final CompletableFuture<RaftServerImpl> oldImpl = impl;
+ impl = new CompletableFuture<>();
+ JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown);
+
+ final RaftConfiguration newConf = RaftConfiguration.newBuilder()
+ .setConf(request.getPeersInNewConf()).build();
+ final RaftServerImpl newImpl;
+ try {
+ newImpl = initImpl(newConf);
+ } catch (IOException ioe) {
+ final RaftException re = new RaftException(
+ "Failed to reinitialize, request=" + request, ioe);
+ impl.completeExceptionally(new IOException(
+ "Server " + getId() + " is not initialized.", re));
+ return new RaftClientReply(request, re);
+ }
+
+ getServerRpc().addPeers(newConf.getPeers());
+ newImpl.start();
+ impl.complete(newImpl);
+ return new RaftClientReply(request, (Message) null);
+ } finally {
+ reinitializeRequest.set(null);
+ }
+ });
+ }
+
/**
* Handle a raft configuration change request from client.
*/
@@ -162,6 +219,10 @@ public class RaftServerProxy implements RaftServer {
@Override
public String toString() {
- return getClass().getSimpleName() + ":" + getId().toString();
+ try {
+ return getImpl().toString();
+ } catch (IOException ignored) {
+ return getClass().getSimpleName() + ":" + getId();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index ef0e454..83fcf54 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -32,7 +32,6 @@ import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.statemachine.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
-import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -335,7 +334,7 @@ public abstract class MiniRaftCluster {
public RaftServerImpl getLeader() {
final List<RaftServerImpl> leaders = new ArrayList<>();
- getServersAliveStream()
+ getServerAliveStream()
.filter(RaftServerImpl::isLeader)
.forEach(s -> {
if (leaders.isEmpty()) {
@@ -353,9 +352,9 @@ public abstract class MiniRaftCluster {
});
if (leaders.isEmpty()) {
return null;
- } else if (leaders.size() != 1) {
- Assert.fail(printServers() + leaders.toString()
- + "leaders.size() = " + leaders.size() + " != 1");
+ } else if (leaders.size() > 1) {
+ throw new IllegalStateException(printServers() + leaders
+ + ", leaders.size() = " + leaders.size() + " > 1");
}
return leaders.get(0);
}
@@ -366,7 +365,7 @@ public abstract class MiniRaftCluster {
}
public List<RaftServerImpl> getFollowers() {
- return getServersAliveStream()
+ return getServerAliveStream()
.filter(RaftServerImpl::isFollower)
.collect(Collectors.toList());
}
@@ -376,13 +375,14 @@ public abstract class MiniRaftCluster {
}
public Iterable<RaftServerImpl> iterateServerImpls() {
- return CollectionUtils.as(getServers(), RaftServerProxy::getImpl);
+ return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked);
}
- public Stream<RaftServerImpl> getServersAliveStream() {
- return getServers().stream()
- .map(RaftServerProxy::getImpl)
- .filter(RaftServerImpl::isAlive);
+ public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
+ return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
+ }
+ public Stream<RaftServerImpl> getServerAliveStream() {
+ return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
}
public RaftServerProxy getServer(RaftPeerId id) {
@@ -408,7 +408,7 @@ public abstract class MiniRaftCluster {
public void shutdown() {
LOG.info("Stopping " + getClass().getSimpleName());
- getServersAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
+ getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
if (ExitUtils.isTerminated()) {
LOG.error("Test resulted in an unexpected exit",
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 9c69d03..26eda15 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -97,7 +97,7 @@ public abstract class RaftBasicTests {
Thread.sleep(cluster.getMaxTimeout() + 100);
LOG.info(cluster.printAllLogs());
- cluster.getServersAliveStream()
+ cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.assertLogEntries(log,
log.getEntries(1, Long.MAX_VALUE), 1, term, messages));