You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2018/09/20 05:01:15 UTC
[1/3] incubator-ratis git commit: RATIS-324. Rename grpc classes.
Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 974919e5e -> ed8e60dad
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
deleted file mode 100644
index d047803..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
-import org.apache.ratis.util.ProtoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
-
-public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase {
- public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class);
-
- private final Supplier<RaftPeerId> idSupplier;
- private final RaftServer server;
-
- public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) {
- this.idSupplier = idSupplier;
- this.server = server;
- }
-
- RaftPeerId getId() {
- return idSupplier.get();
- }
-
- @Override
- public void requestVote(RequestVoteRequestProto request,
- StreamObserver<RequestVoteReplyProto> responseObserver) {
- try {
- final RequestVoteReplyProto reply = server.requestVote(request);
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- } catch (Throwable e) {
- RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e);
- responseObserver.onError(RaftGrpcUtil.wrapException(e));
- }
- }
-
- @Override
- public StreamObserver<AppendEntriesRequestProto> appendEntries(
- StreamObserver<AppendEntriesReplyProto> responseObserver) {
- return new StreamObserver<AppendEntriesRequestProto>() {
- private final AtomicReference<CompletableFuture<Void>> previousOnNext =
- new AtomicReference<>(CompletableFuture.completedFuture(null));
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
- @Override
- public void onNext(AppendEntriesRequestProto request) {
- final CompletableFuture<Void> current = new CompletableFuture<>();
- final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
- try {
- server.appendEntriesAsync(request).thenCombine(previous,
- (reply, v) -> {
- if (!isClosed.get()) {
- responseObserver.onNext(reply);
- }
- current.complete(null);
- return null;
- });
- } catch (Throwable e) {
- RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e);
- responseObserver.onError(RaftGrpcUtil.wrapException(e, request.getServerRequest().getCallId()));
- current.completeExceptionally(e);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- // for now we just log a msg
- RaftGrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t);
- }
-
- @Override
- public void onCompleted() {
- if (isClosed.compareAndSet(false, true)) {
- LOG.info("{}: appendEntries completed", getId());
- responseObserver.onCompleted();
- }
- }
- };
- }
-
- @Override
- public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
- StreamObserver<InstallSnapshotReplyProto> responseObserver) {
- return new StreamObserver<InstallSnapshotRequestProto>() {
- @Override
- public void onNext(InstallSnapshotRequestProto request) {
- try {
- final InstallSnapshotReplyProto reply = server.installSnapshot(request);
- responseObserver.onNext(reply);
- } catch (Throwable e) {
- RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e);
- responseObserver.onError(RaftGrpcUtil.wrapException(e));
- }
- }
-
- @Override
- public void onError(Throwable t) {
- RaftGrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t);
- }
-
- @Override
- public void onCompleted() {
- LOG.info("{}: installSnapshot completed", getId());
- responseObserver.onCompleted();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
deleted file mode 100644
index a2c419f..0000000
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.*;
-import org.apache.ratis.statemachine.StateMachine;
-
-import java.io.IOException;
-
-public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
- public static final Factory<MiniRaftClusterWithGRpc> FACTORY
- = new Factory<MiniRaftClusterWithGRpc>() {
- @Override
- public MiniRaftClusterWithGRpc newCluster(
- String[] ids, RaftProperties prop) {
- RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
- return new MiniRaftClusterWithGRpc(ids, prop);
- }
- };
-
- public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGRpc> {
- @Override
- default Factory<MiniRaftClusterWithGRpc> getFactory() {
- return FACTORY;
- }
- }
-
- public static final DelayLocalExecutionInjection sendServerRequestInjection =
- new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
-
- private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) {
- super(ids, properties, null);
- }
-
- @Override
- protected RaftServerProxy newRaftServer(
- RaftPeerId id, StateMachine stateMachine, RaftGroup group,
- RaftProperties properties) throws IOException {
- GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
- return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null);
- }
-
- @Override
- protected void blockQueueAndSetDelay(String leaderId, int delayMs)
- throws InterruptedException {
- RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
- leaderId, delayMs, getMaxTimeout());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
new file mode 100644
index 0000000..176cfa0
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.server.GrpcService;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.impl.*;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+
+public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
+ public static final Factory<MiniRaftClusterWithGrpc> FACTORY
+ = new Factory<MiniRaftClusterWithGrpc>() {
+ @Override
+ public MiniRaftClusterWithGrpc newCluster(
+ String[] ids, RaftProperties prop) {
+ RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
+ return new MiniRaftClusterWithGrpc(ids, prop);
+ }
+ };
+
+ public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
+ @Override
+ default Factory<MiniRaftClusterWithGrpc> getFactory() {
+ return FACTORY;
+ }
+ }
+
+ public static final DelayLocalExecutionInjection sendServerRequestInjection =
+ new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
+
+ private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
+ super(ids, properties, null);
+ }
+
+ @Override
+ protected RaftServerProxy newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftGroup group,
+ RaftProperties properties) throws IOException {
+ GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
+ return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null);
+ }
+
+ @Override
+ protected void blockQueueAndSetDelay(String leaderId, int delayMs)
+ throws InterruptedException {
+ RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
+ leaderId, delayMs, getMaxTimeout());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
index 0b5e2a9..657bfd1 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
@@ -23,6 +23,6 @@ import org.apache.ratis.server.impl.GroupManagementBaseTest;
public class TestGroupManagementWithGrpc extends GroupManagementBaseTest {
@Override
public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
- return MiniRaftClusterWithGRpc.FACTORY;
+ return MiniRaftClusterWithGrpc.FACTORY;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
index a62dab0..eb08336 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
@@ -22,15 +22,15 @@ import org.apache.ratis.server.impl.LeaderElectionTests;
import org.junit.Test;
public class TestLeaderElectionWithGrpc
- extends LeaderElectionTests<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends LeaderElectionTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
@Override
@Test
public void testEnforceLeader() throws Exception {
super.testEnforceLeader();
- MiniRaftClusterWithGRpc.sendServerRequestInjection.clear();
+ MiniRaftClusterWithGrpc.sendServerRequestInjection.clear();
BlockRequestHandlingInjection.getInstance().unblockAll();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
index 752a3dd..614787e 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
@@ -19,6 +19,6 @@ package org.apache.ratis.grpc;
import org.apache.ratis.RaftAsyncTests;
-public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
index fc110ea..d2b71bc 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
@@ -20,6 +20,6 @@ package org.apache.ratis.grpc;
import org.apache.ratis.RaftExceptionBaseTest;
public class TestRaftExceptionWithGrpc
- extends RaftExceptionBaseTest<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends RaftExceptionBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
deleted file mode 100644
index 822b923..0000000
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
-import org.apache.ratis.util.LogUtils;
-
-import java.io.IOException;
-
-public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest {
- static {
- LogUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
- }
-
- @Override
- public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
- return MiniRaftClusterWithGRpc.FACTORY.newCluster(peerNum, prop);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
new file mode 100644
index 0000000..29f8bea
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.grpc.server.GrpcServerProtocolService;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+import org.apache.ratis.util.LogUtils;
+
+import java.io.IOException;
+
+public class TestRaftReconfigurationWithGrpc extends RaftReconfigurationBaseTest {
+ static {
+ LogUtils.setLogLevel(GrpcServerProtocolService.LOG, Level.DEBUG);
+ }
+
+ @Override
+ public MiniRaftClusterWithGrpc getCluster(int peerNum) {
+ return MiniRaftClusterWithGrpc.FACTORY.newCluster(peerNum, prop);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 7173e1f..8a9e94b 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -33,8 +33,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
@Test
public void testServerRestartOnException() throws Exception {
RaftProperties properties = new RaftProperties();
- final MiniRaftClusterWithGRpc cluster
- = MiniRaftClusterWithGRpc.FACTORY.newCluster(1, properties);
+ final MiniRaftClusterWithGrpc cluster
+ = MiniRaftClusterWithGrpc.FACTORY.newCluster(1, properties);
cluster.start();
RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
GrpcConfigKeys.Server.setPort(properties, cluster.getLeader().getServerRpc().getInetSocketAddress().getPort());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
index 091277d..a960478 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
@@ -18,14 +18,11 @@
package org.apache.ratis.grpc;
import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
-import java.io.IOException;
-
public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
@Override
public MiniRaftCluster.Factory<?> getFactory() {
- return MiniRaftClusterWithGRpc.FACTORY;
+ return MiniRaftClusterWithGrpc.FACTORY;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
index ca36738..c8789a7 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc;
import org.apache.ratis.server.impl.RaftStateMachineExceptionTests;
public class TestRaftStateMachineExceptionWithGrpc
- extends RaftStateMachineExceptionTests<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends RaftStateMachineExceptionTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 17956c7..f3897ac 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -21,8 +21,8 @@ import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.AppendStreamer;
-import org.apache.ratis.grpc.client.RaftOutputStream;
+import org.apache.ratis.grpc.client.GrpcClientStreamer;
+import org.apache.ratis.grpc.client.GrpcOutputStream;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
@@ -51,14 +51,14 @@ import static org.junit.Assert.fail;
@Ignore
public class TestRaftStream extends BaseTest {
static {
- LogUtils.setLogLevel(AppendStreamer.LOG, Level.ALL);
+ LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
}
private static final RaftProperties prop = new RaftProperties();
private static final int NUM_SERVERS = 3;
private static final byte[] BYTES = new byte[4];
- private MiniRaftClusterWithGRpc cluster;
+ private MiniRaftClusterWithGrpc cluster;
@After
public void tearDown() {
@@ -85,12 +85,12 @@ public class TestRaftStream extends BaseTest {
// default 64K is too large for a test
GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+ cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
RaftServerImpl leader = waitForLeader(cluster);
- try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+ try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
cluster.getGroup(), leader.getId())) {
for (int i = 0; i < numRequests; i++) { // generate requests
out.write(toBytes(i));
@@ -124,11 +124,11 @@ public class TestRaftStream extends BaseTest {
LOG.info("Running testWriteAndFlush");
GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+ cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
RaftServerImpl leader = waitForLeader(cluster);
- RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+ GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
cluster.getGroup(), leader.getId());
int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
@@ -203,11 +203,11 @@ public class TestRaftStream extends BaseTest {
LOG.info("Running testWriteWithOffset");
GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+ cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
RaftServerImpl leader = waitForLeader(cluster);
- RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+ GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
cluster.getGroup(), leader.getId());
byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
@@ -261,7 +261,7 @@ public class TestRaftStream extends BaseTest {
LOG.info("Running testChangeLeader");
GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+ cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
final RaftServerImpl leader = waitForLeader(cluster);
@@ -273,7 +273,7 @@ public class TestRaftStream extends BaseTest {
new Thread(() -> {
LOG.info("Writer thread starts");
int count = 0;
- try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+ try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
cluster.getGroup(), leader.getId())) {
while (running.get()) {
out.write(toBytes(count++));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 16c0f31..2d0af07 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -36,8 +36,8 @@ import java.util.concurrent.CompletableFuture;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
public class TestRaftWithGrpc
- extends RaftBasicTests<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends RaftBasicTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
{
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
@@ -53,7 +53,7 @@ public class TestRaftWithGrpc
@Test
public void testRequestTimeout() throws Exception {
- try(MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS)) {
+ try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) {
cluster.start();
testRequestTimeout(false, cluster, LOG);
}
@@ -62,7 +62,7 @@ public class TestRaftWithGrpc
@Test
public void testUpdateViaHeartbeat() throws Exception {
LOG.info("Running testUpdateViaHeartbeat");
- final MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS);
+ final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS);
cluster.start();
waitForLeader(cluster);
long waitTime = 5000;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index f577a48..30a3f0d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -44,16 +44,16 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
}
- private final MiniRaftClusterWithGRpc cluster;
+ private final MiniRaftClusterWithGrpc cluster;
public TestRetryCacheWithGrpc() throws IOException {
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
+ cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(
NUM_SERVERS, properties);
Assert.assertNull(cluster.getLeader());
}
@Override
- public MiniRaftClusterWithGRpc getCluster() {
+ public MiniRaftClusterWithGrpc getCluster() {
return cluster;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
index ef978a1..30be724 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
@@ -20,6 +20,6 @@ package org.apache.ratis.grpc;
import org.apache.ratis.server.impl.ServerInformationBaseTest;
public class TestServerInformationWithGrpc
- extends ServerInformationBaseTest<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends ServerInformationBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/GRpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto
deleted file mode 100644
index d7e550e..0000000
--- a/ratis-proto-shaded/src/main/proto/GRpc.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-syntax = "proto3";
-option java_package = "org.apache.ratis.shaded.proto.grpc";
-option java_outer_classname = "GRpcProtos";
-option java_generate_equals_and_hash = true;
-package ratis.grpc;
-
-import "Raft.proto";
-
-service RaftClientProtocolService {
- // A client-to-server RPC to set new raft configuration
- rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
- returns(ratis.common.RaftClientReplyProto) {}
-
- // A client-to-server stream RPC to append data
- rpc append(stream ratis.common.RaftClientRequestProto)
- returns (stream ratis.common.RaftClientReplyProto) {}
-}
-
-service RaftServerProtocolService {
- rpc requestVote(ratis.common.RequestVoteRequestProto)
- returns(ratis.common.RequestVoteReplyProto) {}
-
- rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
- returns(stream ratis.common.AppendEntriesReplyProto) {}
-
- rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
- returns(ratis.common.InstallSnapshotReplyProto) {}
-}
-
-service AdminProtocolService {
- // A client-to-server RPC to add a new group
- rpc groupManagement(ratis.common.GroupManagementRequestProto)
- returns(ratis.common.RaftClientReplyProto) {}
-
- rpc serverInformation(ratis.common.ServerInformationRequestProto)
- returns(ratis.common.ServerInformationReplyProto) {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/Grpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Grpc.proto b/ratis-proto-shaded/src/main/proto/Grpc.proto
new file mode 100644
index 0000000..5c4bbad
--- /dev/null
+++ b/ratis-proto-shaded/src/main/proto/Grpc.proto
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+option java_package = "org.apache.ratis.shaded.proto.grpc";
+option java_outer_classname = "GrpcProtos";
+option java_generate_equals_and_hash = true;
+package ratis.grpc;
+
+import "Raft.proto";
+
+service RaftClientProtocolService {
+ // A client-to-server RPC to set new raft configuration
+ rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
+ // A client-to-server stream RPC to append data
+ rpc append(stream ratis.common.RaftClientRequestProto)
+ returns (stream ratis.common.RaftClientReplyProto) {}
+}
+
+service RaftServerProtocolService {
+ rpc requestVote(ratis.common.RequestVoteRequestProto)
+ returns(ratis.common.RequestVoteReplyProto) {}
+
+ rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
+ returns(stream ratis.common.AppendEntriesReplyProto) {}
+
+ rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
+ returns(ratis.common.InstallSnapshotReplyProto) {}
+}
+
+service AdminProtocolService {
+ // A client-to-server RPC to add a new group
+ rpc groupManagement(ratis.common.GroupManagementRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
+ rpc serverInformation(ratis.common.ServerInformationRequestProto)
+ returns(ratis.common.ServerInformationReplyProto) {}
+}
[3/3] incubator-ratis git commit: RATIS-324. Rename grpc classes.
Contributed by Tsz Wo Nicholas Sze.
Posted by lj...@apache.org.
RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ed8e60da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ed8e60da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ed8e60da
Branch: refs/heads/master
Commit: ed8e60dad83ae733fe6c6c3752e82f1fee72c12e
Parents: 974919e
Author: Lokesh Jain <lj...@apache.org>
Authored: Thu Sep 20 10:29:48 2018 +0530
Committer: Lokesh Jain <lj...@apache.org>
Committed: Thu Sep 20 10:29:48 2018 +0530
----------------------------------------------------------------------
.../ratis/examples/ParameterizedBaseTest.java | 6 +-
.../filestore/TestFileStoreAsyncWithGrpc.java | 6 +-
.../filestore/TestFileStoreWithGrpc.java | 6 +-
.../java/org/apache/ratis/grpc/GrpcFactory.java | 9 +-
.../java/org/apache/ratis/grpc/GrpcUtil.java | 131 ++++++
.../org/apache/ratis/grpc/RaftGRpcService.java | 154 -------
.../org/apache/ratis/grpc/RaftGrpcUtil.java | 131 ------
.../ratis/grpc/client/AppendStreamer.java | 390 -----------------
.../grpc/client/GrpcClientProtocolClient.java | 234 ++++++++++
.../grpc/client/GrpcClientProtocolProxy.java | 107 +++++
.../grpc/client/GrpcClientProtocolService.java | 195 +++++++++
.../apache/ratis/grpc/client/GrpcClientRpc.java | 14 +-
.../ratis/grpc/client/GrpcClientStreamer.java | 390 +++++++++++++++++
.../ratis/grpc/client/GrpcOutputStream.java | 112 +++++
.../grpc/client/RaftClientProtocolClient.java | 234 ----------
.../grpc/client/RaftClientProtocolProxy.java | 107 -----
.../grpc/client/RaftClientProtocolService.java | 195 ---------
.../ratis/grpc/client/RaftOutputStream.java | 114 -----
.../ratis/grpc/server/AdminProtocolService.java | 53 ---
.../ratis/grpc/server/GRpcLogAppender.java | 438 -------------------
.../grpc/server/GrpcAdminProtocolService.java | 53 +++
.../ratis/grpc/server/GrpcLogAppender.java | 437 ++++++++++++++++++
.../grpc/server/GrpcServerProtocolClient.java | 75 ++++
.../grpc/server/GrpcServerProtocolService.java | 134 ++++++
.../apache/ratis/grpc/server/GrpcService.java | 152 +++++++
.../grpc/server/RaftServerProtocolClient.java | 75 ----
.../grpc/server/RaftServerProtocolService.java | 134 ------
.../ratis/grpc/MiniRaftClusterWithGRpc.java | 72 ---
.../ratis/grpc/MiniRaftClusterWithGrpc.java | 72 +++
.../ratis/grpc/TestGroupManagementWithGrpc.java | 2 +-
.../ratis/grpc/TestLeaderElectionWithGrpc.java | 6 +-
.../ratis/grpc/TestRaftAsyncWithGrpc.java | 4 +-
.../ratis/grpc/TestRaftExceptionWithGrpc.java | 4 +-
.../grpc/TestRaftReconfigurationWithGRpc.java | 36 --
.../grpc/TestRaftReconfigurationWithGrpc.java | 36 ++
.../ratis/grpc/TestRaftServerWithGrpc.java | 4 +-
.../ratis/grpc/TestRaftSnapshotWithGrpc.java | 5 +-
.../TestRaftStateMachineExceptionWithGrpc.java | 4 +-
.../org/apache/ratis/grpc/TestRaftStream.java | 24 +-
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 8 +-
.../ratis/grpc/TestRetryCacheWithGrpc.java | 6 +-
.../grpc/TestServerInformationWithGrpc.java | 4 +-
ratis-proto-shaded/src/main/proto/GRpc.proto | 54 ---
ratis-proto-shaded/src/main/proto/Grpc.proto | 54 +++
44 files changed, 2237 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 057c73a..03f60ec 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -21,7 +21,7 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc;
import org.apache.ratis.netty.MiniRaftClusterWithNetty;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
@@ -101,8 +101,8 @@ public abstract class ParameterizedBaseTest extends BaseTest {
if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) {
add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop);
}
- if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) {
- add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop);
+ if (isAll || classes.contains(MiniRaftClusterWithGrpc.class)) {
+ add(clusters, MiniRaftClusterWithGrpc.FACTORY, ids.next(), prop);
}
if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) {
add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
index 02bd2b0..d25c85e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.examples.filestore;
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
public class TestFileStoreAsyncWithGrpc
- extends FileStoreAsyncBaseTest<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends FileStoreAsyncBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
index 71ae294..6e46b6e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.examples.filestore;
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
public class TestFileStoreWithGrpc
- extends FileStoreBaseTest<MiniRaftClusterWithGRpc>
- implements MiniRaftClusterWithGRpc.FactoryGet {
+ extends FileStoreBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 4f2612e..836ee1c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -21,7 +21,8 @@ import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientRpc;
-import org.apache.ratis.grpc.server.GRpcLogAppender;
+import org.apache.ratis.grpc.server.GrpcLogAppender;
+import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
@@ -38,12 +39,12 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
@Override
public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
FollowerInfo f) {
- return new GRpcLogAppender(server, state, f);
+ return new GrpcLogAppender(server, state, f);
}
@Override
- public RaftGRpcService newRaftServerRpc(RaftServer server) {
- return RaftGRpcService.newBuilder()
+ public GrpcService newRaftServerRpc(RaftServer server) {
+ return GrpcService.newBuilder()
.setServer(server)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
new file mode 100644
index 0000000..84f01c8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.ServerNotReadyException;
+import org.apache.ratis.shaded.io.grpc.Metadata;
+import org.apache.ratis.shaded.io.grpc.Status;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public interface GrpcUtil {
+ Metadata.Key<String> EXCEPTION_TYPE_KEY =
+ Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
+ Metadata.Key<String> CALL_ID =
+ Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
+
+ static StatusRuntimeException wrapException(Throwable t) {
+ return wrapException(t, -1);
+ }
+
+ static StatusRuntimeException wrapException(Throwable t, long callId) {
+ t = JavaUtils.unwrapCompletionException(t);
+
+ Metadata trailers = new Metadata();
+ trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
+ if (callId > 0) {
+ trailers.put(CALL_ID, String.valueOf(callId));
+ }
+ return new StatusRuntimeException(
+ Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers);
+ }
+
+ static Throwable unwrapThrowable(Throwable t) {
+ if (t instanceof StatusRuntimeException) {
+ final IOException ioe = tryUnwrapException((StatusRuntimeException)t);
+ if (ioe != null) {
+ return ioe;
+ }
+ }
+ return t;
+ }
+
+ static IOException unwrapException(StatusRuntimeException se) {
+ final IOException ioe = tryUnwrapException(se);
+ return ioe != null? ioe: new IOException(se);
+ }
+
+ static IOException tryUnwrapException(StatusRuntimeException se) {
+ final Metadata trailers = se.getTrailers();
+ final Status status = se.getStatus();
+ if (trailers != null && status != null) {
+ final String className = trailers.get(EXCEPTION_TYPE_KEY);
+ if (className != null) {
+ try {
+ Class<?> clazz = Class.forName(className);
+ final Exception unwrapped = ReflectionUtils.instantiateException(
+ clazz.asSubclass(Exception.class), status.getDescription(), se);
+ return IOUtils.asIOException(unwrapped);
+ } catch (Exception e) {
+ se.addSuppressed(e);
+ return new IOException(se);
+ }
+ }
+ }
+ return null;
+ }
+
+ static long getCallId(Throwable t) {
+ if (t instanceof StatusRuntimeException) {
+ final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
+ String callId = trailers.get(CALL_ID);
+ return callId != null ? Integer.parseInt(callId) : -1;
+ }
+ return -1;
+ }
+
+ static IOException unwrapIOException(Throwable t) {
+ final IOException e;
+ if (t instanceof StatusRuntimeException) {
+ e = GrpcUtil.unwrapException((StatusRuntimeException) t);
+ } else {
+ e = IOUtils.asIOException(t);
+ }
+ return e;
+ }
+
+ static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
+ StreamObserver<REPLY_PROTO> responseObserver,
+ CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
+ Function<REPLY, REPLY_PROTO> toProto) {
+ try {
+ supplier.get().whenCompleteAsync((reply, exception) -> {
+ if (exception != null) {
+ responseObserver.onError(GrpcUtil.wrapException(exception));
+ } else {
+ responseObserver.onNext(toProto.apply(reply));
+ responseObserver.onCompleted();
+ }
+ });
+ } catch (Exception e) {
+ responseObserver.onError(GrpcUtil.wrapException(e));
+ }
+ }
+
+ static void warn(Logger log, Supplier<String> message, Throwable t) {
+ LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
deleted file mode 100644
index d638a45..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.grpc.client.RaftClientProtocolService;
-import org.apache.ratis.grpc.server.AdminProtocolService;
-import org.apache.ratis.grpc.server.RaftServerProtocolClient;
-import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
-import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.function.Supplier;
-
-/** A grpc implementation of {@link RaftServerRpc}. */
-public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolClient, PeerProxyMap<RaftServerProtocolClient>> {
- static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
- public static final String GRPC_SEND_SERVER_REQUEST =
- RaftGRpcService.class.getSimpleName() + ".sendRequest";
-
- public static class Builder extends RaftServerRpc.Builder<Builder,RaftGRpcService> {
- private Builder() {}
-
- @Override
- public Builder getThis() {
- return this;
- }
-
- @Override
- public RaftGRpcService build() {
- return new RaftGRpcService(getServer());
- }
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- private final Server server;
- private final Supplier<InetSocketAddress> addressSupplier;
-
- private RaftGRpcService(RaftServer server) {
- this(server, server::getId,
- GrpcConfigKeys.Server.port(server.getProperties()),
- GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
- GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
- }
- private RaftGRpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
- SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
- SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) {
- super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
- p -> new RaftServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
- if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
- throw new IllegalArgumentException("Illegal configuration: "
- + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
- + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
- }
-
- server = NettyServerBuilder.forPort(port)
- .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
- .flowControlWindow(flowControlWindow.getSizeInt())
- .addService(new RaftServerProtocolService(idSupplier, raftServer))
- .addService(new RaftClientProtocolService(idSupplier, raftServer))
- .addService(new AdminProtocolService(raftServer))
- .build();
- addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
- }
-
- @Override
- public SupportedRpcType getRpcType() {
- return SupportedRpcType.GRPC;
- }
-
- @Override
- public void startImpl() {
- try {
- server.start();
- } catch (IOException e) {
- ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
- }
- LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress());
- }
-
- @Override
- public void closeImpl() throws IOException {
- final String name = getId() + ": shutdown server with port " + server.getPort();
- LOG.info("{} now", name);
- final Server s = server.shutdownNow();
- super.closeImpl();
- try {
- s.awaitTermination();
- } catch(InterruptedException e) {
- throw IOUtils.toInterruptedIOException(name + " failed", e);
- }
- LOG.info("{} successfully", name);
- }
-
- @Override
- public InetSocketAddress getInetSocketAddress() {
- return addressSupplier.get();
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(
- AppendEntriesRequestProto request) throws IOException {
- throw new UnsupportedOperationException(
- "Blocking AppendEntries call is not supported");
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- throw new UnsupportedOperationException(
- "Blocking InstallSnapshot call is not supported");
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
- throws IOException {
- CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
- null, request);
-
- final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
- return getProxies().getProxy(target).requestVote(request);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
deleted file mode 100644
index ecbbf44..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.ServerNotReadyException;
-import org.apache.ratis.shaded.io.grpc.Metadata;
-import org.apache.ratis.shaded.io.grpc.Status;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-public interface RaftGrpcUtil {
- Metadata.Key<String> EXCEPTION_TYPE_KEY =
- Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
- Metadata.Key<String> CALL_ID =
- Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
-
- static StatusRuntimeException wrapException(Throwable t) {
- return wrapException(t, -1);
- }
-
- static StatusRuntimeException wrapException(Throwable t, long callId) {
- t = JavaUtils.unwrapCompletionException(t);
-
- Metadata trailers = new Metadata();
- trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
- if (callId > 0) {
- trailers.put(CALL_ID, String.valueOf(callId));
- }
- return new StatusRuntimeException(
- Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers);
- }
-
- static Throwable unwrapThrowable(Throwable t) {
- if (t instanceof StatusRuntimeException) {
- final IOException ioe = tryUnwrapException((StatusRuntimeException)t);
- if (ioe != null) {
- return ioe;
- }
- }
- return t;
- }
-
- static IOException unwrapException(StatusRuntimeException se) {
- final IOException ioe = tryUnwrapException(se);
- return ioe != null? ioe: new IOException(se);
- }
-
- static IOException tryUnwrapException(StatusRuntimeException se) {
- final Metadata trailers = se.getTrailers();
- final Status status = se.getStatus();
- if (trailers != null && status != null) {
- final String className = trailers.get(EXCEPTION_TYPE_KEY);
- if (className != null) {
- try {
- Class<?> clazz = Class.forName(className);
- final Exception unwrapped = ReflectionUtils.instantiateException(
- clazz.asSubclass(Exception.class), status.getDescription(), se);
- return IOUtils.asIOException(unwrapped);
- } catch (Exception e) {
- se.addSuppressed(e);
- return new IOException(se);
- }
- }
- }
- return null;
- }
-
- static long getCallId(Throwable t) {
- if (t instanceof StatusRuntimeException) {
- final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
- String callId = trailers.get(CALL_ID);
- return callId != null ? Integer.parseInt(callId) : -1;
- }
- return -1;
- }
-
- static IOException unwrapIOException(Throwable t) {
- final IOException e;
- if (t instanceof StatusRuntimeException) {
- e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
- } else {
- e = IOUtils.asIOException(t);
- }
- return e;
- }
-
- static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
- StreamObserver<REPLY_PROTO> responseObserver,
- CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
- Function<REPLY, REPLY_PROTO> toProto) {
- try {
- supplier.get().whenCompleteAsync((reply, exception) -> {
- if (exception != null) {
- responseObserver.onError(RaftGrpcUtil.wrapException(exception));
- } else {
- responseObserver.onNext(toProto.apply(reply));
- responseObserver.onCompleted();
- }
- });
- } catch (Exception e) {
- responseObserver.onError(RaftGrpcUtil.wrapException(e));
- }
- }
-
- static void warn(Logger log, Supplier<String> message, Throwable t) {
- LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
deleted file mode 100644
index 3068751..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class AppendStreamer implements Closeable {
- public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class);
-
- enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
-
- private static class ExceptionAndRetry {
- private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
- private final AtomicInteger retryTimes = new AtomicInteger(0);
- private final int maxRetryTimes;
- private final TimeDuration retryInterval;
-
- ExceptionAndRetry(RaftProperties prop) {
- maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
- retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
- }
-
- void addException(RaftPeerId peer, IOException e) {
- exceptionMap.put(peer, e);
- retryTimes.incrementAndGet();
- }
-
- IOException getCombinedException() {
- return new IOException("Exceptions: " + exceptionMap);
- }
-
- boolean shouldRetry() {
- return retryTimes.get() <= maxRetryTimes;
- }
- }
-
- private final Deque<RaftClientRequestProto> dataQueue;
- private final Deque<RaftClientRequestProto> ackQueue;
- private final int maxPendingNum;
- private final SizeInBytes maxMessageSize;
-
- private final PeerProxyMap<RaftClientProtocolProxy> proxyMap;
- private final Map<RaftPeerId, RaftPeer> peers;
- private RaftPeerId leaderId;
- private volatile RaftClientProtocolProxy leaderProxy;
- private final ClientId clientId;
-
- private volatile RunningState running = RunningState.RUNNING;
- private final ExceptionAndRetry exceptionAndRetry;
- private final Sender senderThread;
- private final RaftGroupId groupId;
-
- AppendStreamer(RaftProperties prop, RaftGroup group,
- RaftPeerId leaderId, ClientId clientId) {
- this.clientId = clientId;
- maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
- maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
- dataQueue = new ConcurrentLinkedDeque<>();
- ackQueue = new ConcurrentLinkedDeque<>();
- exceptionAndRetry = new ExceptionAndRetry(prop);
-
- this.groupId = group.getGroupId();
- this.peers = group.getPeers().stream().collect(
- Collectors.toMap(RaftPeer::getId, Function.identity()));
- proxyMap = new PeerProxyMap<>(clientId.toString(),
- raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
- prop));
- proxyMap.addPeers(group.getPeers());
- refreshLeaderProxy(leaderId, null);
-
- senderThread = new Sender();
- senderThread.setName(this.toString() + "-sender");
- senderThread.start();
- }
-
- private synchronized void refreshLeaderProxy(RaftPeerId suggested,
- RaftPeerId oldLeader) {
- if (suggested != null) {
- leaderId = suggested;
- } else {
- if (oldLeader == null) {
- leaderId = peers.keySet().iterator().next();
- } else {
- leaderId = CollectionUtils.random(oldLeader, peers.keySet());
- if (leaderId == null) {
- leaderId = oldLeader;
- }
- }
- }
- LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
- oldLeader, leaderId, suggested);
- if (leaderProxy != null) {
- leaderProxy.closeCurrentSession();
- }
- try {
- leaderProxy = proxyMap.getProxy(leaderId);
- } catch (IOException e) {
- LOG.error("Should not hit IOException here", e);
- refreshLeader(null, leaderId);
- }
- }
-
- private boolean isRunning() {
- return running == RunningState.RUNNING ||
- running == RunningState.LOOK_FOR_LEADER;
- }
-
- private void checkState() throws IOException {
- if (!isRunning()) {
- throwException("The AppendStreamer has been closed");
- }
- }
-
- synchronized void write(ByteString content, long seqNum)
- throws IOException {
- checkState();
- while (isRunning() && dataQueue.size() >= maxPendingNum) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- }
- }
- if (isRunning()) {
- // wrap the current buffer into a RaftClientRequestProto
- final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
- clientId, leaderId, groupId, seqNum, seqNum, content);
- if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
- throw new IOException("msg size:" + request.getSerializedSize() +
- " exceeds maximum:" + maxMessageSize.getSizeInt());
- }
- dataQueue.offer(request);
- this.notifyAll();
- } else {
- throwException(this + " got closed.");
- }
- }
-
- synchronized void flush() throws IOException {
- checkState();
- if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
- return;
- }
- // wait for the pending Q to become empty
- while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- }
- }
- if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
- throwException(this + " got closed before finishing flush");
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!isRunning()) {
- return;
- }
- flush();
-
- running = RunningState.CLOSED;
- senderThread.interrupt();
- try {
- senderThread.join();
- } catch (InterruptedException ignored) {
- }
- proxyMap.close();
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + "-" + clientId;
- }
-
- private class Sender extends Daemon {
- @Override
- public void run() {
- while (isRunning()) {
-
- synchronized (AppendStreamer.this) {
- while (isRunning() && shouldWait()) {
- try {
- AppendStreamer.this.wait();
- } catch (InterruptedException ignored) {
- }
- }
- if (running == RunningState.RUNNING) {
- Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty");
- RaftClientRequestProto next = dataQueue.poll();
- leaderProxy.onNext(next);
- ackQueue.offer(next);
- }
- }
- }
- }
-
- private boolean shouldWait() {
- // the sender should wait if any of the following is true
- // 1) there is no data to send
- // 2) there are too many outstanding pending requests
- // 3) Error/NotLeaderException just happened, we're still waiting for
- // the first response to confirm the new leader
- return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
- running == RunningState.LOOK_FOR_LEADER;
- }
- }
-
- /** the response handler for stream RPC */
- private class ResponseHandler implements
- RaftClientProtocolProxy.CloseableStreamObserver {
- private final RaftPeerId targetId;
- // once handled the first NotLeaderException or Error, the handler should
- // be inactive and should not make any further action.
- private volatile boolean active = true;
-
- ResponseHandler(RaftPeer target) {
- targetId = target.getId();
- }
-
- @Override
- public String toString() {
- return AppendStreamer.this + "-ResponseHandler-" + targetId;
- }
-
- @Override
- public void onNext(RaftClientReplyProto reply) {
- if (!active) {
- return;
- }
- synchronized (AppendStreamer.this) {
- RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek());
- if (reply.getRpcReply().getSuccess()) {
- Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(),
- () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply));
- ackQueue.poll();
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending));
- }
- // we've identified the correct leader
- if (running == RunningState.LOOK_FOR_LEADER) {
- running = RunningState.RUNNING;
- }
- } else {
- // this may be a NotLeaderException
- RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
- final NotLeaderException nle = r.getNotLeaderException();
- if (nle != null) {
- LOG.debug("{} received a NotLeaderException from {}", this,
- r.getServerId());
- handleNotLeader(nle, targetId);
- }
- }
- AppendStreamer.this.notifyAll();
- }
- }
-
- @Override
- public void onError(Throwable t) {
- LOG.warn(this + " onError", t);
- if (active) {
- synchronized (AppendStreamer.this) {
- handleError(t, this);
- AppendStreamer.this.notifyAll();
- }
- }
- }
-
- @Override
- public void onCompleted() {
- LOG.info("{} onCompleted, pending requests #: {}", this,
- ackQueue.size());
- }
-
- @Override // called by handleError and handleNotLeader
- public void close() throws IOException {
- active = false;
- }
- }
-
- private void throwException(String msg) throws IOException {
- if (running == RunningState.ERROR) {
- throw exceptionAndRetry.getCombinedException();
- } else {
- throw new IOException(msg);
- }
- }
-
- private void handleNotLeader(NotLeaderException nle,
- RaftPeerId oldLeader) {
- Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this));
- // handle NotLeaderException: refresh leader and RaftConfiguration
- refreshPeers(nle.getPeers());
-
- refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
- }
-
- private void handleError(Throwable t, ResponseHandler handler) {
- Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this));
- final IOException e = RaftGrpcUtil.unwrapIOException(t);
-
- exceptionAndRetry.addException(handler.targetId, e);
- LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
- handler, e, exceptionAndRetry.retryTimes.get(),
- exceptionAndRetry.maxRetryTimes);
-
- leaderProxy.onError();
- if (exceptionAndRetry.shouldRetry()) {
- refreshLeader(null, leaderId);
- } else {
- running = RunningState.ERROR;
- }
- }
-
- private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) {
- running = RunningState.LOOK_FOR_LEADER;
- refreshLeaderProxy(suggestedLeader, oldLeader);
- reQueuePendingRequests(leaderId);
-
- final RaftClientRequestProto request = Objects.requireNonNull(
- dataQueue.poll());
- ackQueue.offer(request);
- try {
- exceptionAndRetry.retryInterval.sleep();
- } catch (InterruptedException ignored) {
- }
- leaderProxy.onNext(request);
- }
-
- private void reQueuePendingRequests(RaftPeerId newLeader) {
- if (isRunning()) {
- // resend all the pending requests
- while (!ackQueue.isEmpty()) {
- final RaftClientRequestProto oldRequest = ackQueue.pollLast();
- final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
- .setReplyId(newLeader.toByteString());
- final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest)
- .setRpcRequest(newRpc).build();
- dataQueue.offerFirst(newRequest);
- }
- }
- }
-
- private void refreshPeers(RaftPeer[] newPeers) {
- if (newPeers != null && newPeers.length > 0) {
- // we only add new peers, we do not remove any peer even if it no longer
- // belongs to the current raft conf
- Arrays.stream(newPeers).forEach(peer -> {
- peers.putIfAbsent(peer.getId(), peer);
- proxyMap.computeIfAbsent(peer);
- });
-
- LOG.debug("refreshed peers: {}", peers);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
new file mode 100644
index 0000000..a2e53bf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.netty.NegotiationType;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
+import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GrpcClientProtocolClient implements Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolClient.class);
+
+ private final Supplier<String> name;
+ private final RaftPeer target;
+ private final ManagedChannel channel;
+
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+
+ private final RaftClientProtocolServiceBlockingStub blockingStub;
+ private final RaftClientProtocolServiceStub asyncStub;
+ private final AdminProtocolServiceBlockingStub adminBlockingStub;
+
+ private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+
+ public GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) {
+ this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
+ this.target = target;
+
+ final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
+ final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
+ channel = NettyChannelBuilder.forTarget(target.getAddress())
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .flowControlWindow(flowControlWindow.getSizeInt())
+ .maxInboundMessageSize(maxMessageSize.getSizeInt())
+ .build();
+ blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
+ asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
+ adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
+ }
+
+ String getName() {
+ return name.get();
+ }
+
+ @Override
+ public void close() {
+ final AsyncStreamObservers observers = appendStreamObservers.get();
+ if (observers != null) {
+ observers.close();
+ }
+ channel.shutdownNow();
+ }
+
+ RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
+ return blockingCall(() -> adminBlockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .groupManagement(request));
+ }
+
+ ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) {
+ return adminBlockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .serverInformation(request);
+ }
+
+ RaftClientReplyProto setConfiguration(
+ SetConfigurationRequestProto request) throws IOException {
+ return blockingCall(() -> blockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .setConfiguration(request));
+ }
+
+ private static RaftClientReplyProto blockingCall(
+ CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
+ ) throws IOException {
+ try {
+ return supplier.get();
+ } catch (StatusRuntimeException e) {
+ throw GrpcUtil.unwrapException(e);
+ }
+ }
+
+ StreamObserver<RaftClientRequestProto> append(
+ StreamObserver<RaftClientReplyProto> responseHandler) {
+ return asyncStub.append(responseHandler);
+ }
+
+ StreamObserver<RaftClientRequestProto> appendWithTimeout(
+ StreamObserver<RaftClientReplyProto> responseHandler) {
+ return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .append(responseHandler);
+ }
+
+ AsyncStreamObservers getAppendStreamObservers() {
+ return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+ }
+
+ public RaftPeer getTarget() {
+ return target;
+ }
+
+ class AsyncStreamObservers implements Closeable {
+ /** Request map: callId -> future */
+ private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+ private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
+ @Override
+ public void onNext(RaftClientReplyProto proto) {
+ final long callId = proto.getRpcReply().getCallId();
+ try {
+ final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+ final NotLeaderException nle = reply.getNotLeaderException();
+ if (nle != null) {
+ completeReplyExceptionally(nle, NotLeaderException.class.getName());
+ return;
+ }
+ handleReplyFuture(callId, f -> f.complete(reply));
+ } catch (Throwable t) {
+ handleReplyFuture(callId, f -> f.completeExceptionally(t));
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ final IOException ioe = GrpcUtil.unwrapIOException(t);
+ completeReplyExceptionally(ioe, "onError");
+ }
+
+ @Override
+ public void onCompleted() {
+ completeReplyExceptionally(null, "completed");
+ }
+ };
+ private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+
+ CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+ if (map == null) {
+ return JavaUtils.completeExceptionally(new IOException("Already closed."));
+ }
+ final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+ CollectionUtils.putNew(request.getCallId(), f, map,
+ () -> getName() + ":" + getClass().getSimpleName());
+ try {
+ requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+ scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
+ () -> "Timeout check failed for client request: " + request);
+ } catch(Throwable t) {
+ handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
+ }
+ return f;
+ }
+
+ private void timeoutCheck(RaftClientRequest request) {
+ handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
+ new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
+ }
+
+ private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
+ Optional.ofNullable(replies.get())
+ .map(replyMap -> replyMap.remove(callId))
+ .ifPresent(handler);
+ }
+
+ @Override
+ public void close() {
+ requestStreamObserver.onCompleted();
+ completeReplyExceptionally(null, "close");
+ }
+
+ private void completeReplyExceptionally(Throwable t, String event) {
+ appendStreamObservers.compareAndSet(this, null);
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+ if (map == null) {
+ return;
+ }
+ for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
+ final CompletableFuture<RaftClientReply> f = entry.getValue();
+ if (!f.isDone()) {
+ f.completeExceptionally(t != null? t
+ : new IOException(getName() + ": Stream " + event
+ + ": no reply for async request cid=" + entry.getKey()));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
new file mode 100644
index 0000000..156e6c3
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.RaftPeer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Function;
+
+public class GrpcClientProtocolProxy implements Closeable {
+ private final GrpcClientProtocolClient proxy;
+ private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
+ private RpcSession currentSession;
+
+ public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target,
+ Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
+ RaftProperties properties) {
+ proxy = new GrpcClientProtocolClient(clientId, target, properties);
+ this.responseHandlerCreation = responseHandlerCreation;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrentSession();
+ proxy.close();
+ }
+
+ @Override
+ public String toString() {
+ return "ProxyTo:" + proxy.getTarget();
+ }
+
+ public void closeCurrentSession() {
+ if (currentSession != null) {
+ currentSession.close();
+ currentSession = null;
+ }
+ }
+
+ public void onNext(RaftClientRequestProto request) {
+ if (currentSession == null) {
+ currentSession = new RpcSession(
+ responseHandlerCreation.apply(proxy.getTarget()));
+ }
+ currentSession.requestObserver.onNext(request);
+ }
+
+ public void onError() {
+ if (currentSession != null) {
+ currentSession.onError();
+ }
+ }
+
+ public interface CloseableStreamObserver
+ extends StreamObserver<RaftClientReplyProto>, Closeable {
+ }
+
+ class RpcSession implements Closeable {
+ private final StreamObserver<RaftClientRequestProto> requestObserver;
+ private final CloseableStreamObserver responseHandler;
+ private boolean hasError = false;
+
+ RpcSession(CloseableStreamObserver responseHandler) {
+ this.responseHandler = responseHandler;
+ this.requestObserver = proxy.append(responseHandler);
+ }
+
+ void onError() {
+ hasError = true;
+ }
+
+ @Override
+ public void close() {
+ if (!hasError) {
+ try {
+ requestObserver.onCompleted();
+ } catch (Exception ignored) {
+ }
+ }
+ try {
+ responseHandler.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
new file mode 100644
index 0000000..22f7f56
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
+
+ private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
+ private final RaftClientRequest request;
+ private volatile RaftClientReply reply;
+
+ PendingAppend(RaftClientRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public boolean hasReply() {
+ return reply != null || this == COMPLETED;
+ }
+
+ @Override
+ public void setReply(RaftClientReply reply) {
+ this.reply = reply;
+ }
+
+ RaftClientReply getReply() {
+ return reply;
+ }
+
+ RaftClientRequest getRequest() {
+ return request;
+ }
+
+ @Override
+ public long getSeqNum() {
+ return request != null? request.getSeqNum(): Long.MAX_VALUE;
+ }
+
+ @Override
+ public String toString() {
+ return request != null? getSeqNum() + ":" + reply: "COMPLETED";
+ }
+ }
+ private static final PendingAppend COMPLETED = new PendingAppend(null);
+
+ private final Supplier<RaftPeerId> idSupplier;
+ private final RaftClientAsynchronousProtocol protocol;
+
+ public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
+ this.idSupplier = idSupplier;
+ this.protocol = protocol;
+ }
+
+ RaftPeerId getId() {
+ return idSupplier.get();
+ }
+
+ @Override
+ public void setConfiguration(SetConfigurationRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
+
+ @Override
+ public StreamObserver<RaftClientRequestProto> append(
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ return new AppendRequestStreamObserver(responseObserver);
+ }
+
+ private final AtomicInteger streamCount = new AtomicInteger();
+
+ private class AppendRequestStreamObserver implements
+ StreamObserver<RaftClientRequestProto> {
+ private final String name = getId() + "-" + streamCount.getAndIncrement();
+ private final StreamObserver<RaftClientReplyProto> responseObserver;
+ private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+ = new SlidingWindow.Server<>(name, COMPLETED);
+ private final AtomicBoolean isClosed;
+
+ AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
+ LOG.debug("new AppendRequestStreamObserver {}", name);
+ this.responseObserver = ro;
+ this.isClosed = new AtomicBoolean(false);
+ }
+
+ void processClientRequestAsync(PendingAppend pending) {
+ try {
+ protocol.submitClientRequestAsync(pending.getRequest()
+ ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
+ pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+ ).exceptionally(exception -> {
+ // TODO: the exception may be from either raft or state machine.
+ // Currently we skip all the following responses when getting an
+ // exception from the state machine.
+ responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+ return null;
+ });
+ } catch (IOException e) {
+ throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+ }
+ }
+
+ @Override
+ public void onNext(RaftClientRequestProto request) {
+ try {
+ final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
+ final PendingAppend p = new PendingAppend(r);
+ slidingWindow.receivedRequest(p, this::processClientRequestAsync);
+ } catch (Throwable e) {
+ responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
+ }
+ }
+
+ private void sendReply(PendingAppend ready) {
+ Preconditions.assertTrue(ready.hasReply());
+ if (ready == COMPLETED) {
+ close();
+ } else {
+ LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
+ responseObserver.onNext(
+ ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // for now we just log a msg
+ GrpcUtil.warn(LOG, () -> name + ": onError", t);
+ slidingWindow.close();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (slidingWindow.endOfRequests()) {
+ close();
+ }
+ }
+
+ private void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.debug("{}: close", name);
+ responseObserver.onCompleted();
+ slidingWindow.close();
+ }
+ }
+
+ void responseError(Throwable t, Supplier<String> message) {
+ if (isClosed.compareAndSet(false, true)) {
+ t = JavaUtils.unwrapCompletionException(t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(name + ": Failed " + message.get(), t);
+ }
+ responseObserver.onError(GrpcUtil.wrapException(t));
+ slidingWindow.close();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 48ab95d..47264e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -21,7 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
@@ -46,14 +46,14 @@ import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> {
+public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClient> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
private final ClientId clientId;
private final int maxMessageSize;
public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
- super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties)));
+ super(new PeerProxyMap<>(clientId.toString(), p -> new GrpcClientProtocolClient(clientId, p, properties)));
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
}
@@ -63,7 +63,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
RaftClientRequest request) {
final RaftPeerId serverId = request.getServerId();
try {
- final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+ final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
// Reuse the same grpc stream for all async calls.
return proxy.getAppendStreamObservers().onNext(request);
} catch (IOException e) {
@@ -75,7 +75,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
public RaftClientReply sendRequest(RaftClientRequest request)
throws IOException {
final RaftPeerId serverId = request.getServerId();
- final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+ final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
if (request instanceof GroupManagementRequest) {
final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto((GroupManagementRequest)request);
return ClientProtoUtils.toRaftClientReply(proxy.groupAdd(proto));
@@ -102,7 +102,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
}
private CompletableFuture<RaftClientReply> sendRequest(
- RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
+ RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
final RaftClientRequestProto requestProto =
toRaftClientRequestProto(request);
final CompletableFuture<RaftClientReplyProto> replyFuture =
@@ -117,7 +117,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
@Override
public void onError(Throwable t) {
- replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t));
+ replyFuture.completeExceptionally(GrpcUtil.unwrapIOException(t));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
new file mode 100644
index 0000000..5e5b941
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class GrpcClientStreamer implements Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcClientStreamer.class);
+
+ enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
+
+ private static class ExceptionAndRetry {
+ private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
+ private final AtomicInteger retryTimes = new AtomicInteger(0);
+ private final int maxRetryTimes;
+ private final TimeDuration retryInterval;
+
+ ExceptionAndRetry(RaftProperties prop) {
+ maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
+ retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
+ }
+
+ void addException(RaftPeerId peer, IOException e) {
+ exceptionMap.put(peer, e);
+ retryTimes.incrementAndGet();
+ }
+
+ IOException getCombinedException() {
+ return new IOException("Exceptions: " + exceptionMap);
+ }
+
+ boolean shouldRetry() {
+ return retryTimes.get() <= maxRetryTimes;
+ }
+ }
+
+ private final Deque<RaftClientRequestProto> dataQueue;
+ private final Deque<RaftClientRequestProto> ackQueue;
+ private final int maxPendingNum;
+ private final SizeInBytes maxMessageSize;
+
+ private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap;
+ private final Map<RaftPeerId, RaftPeer> peers;
+ private RaftPeerId leaderId;
+ private volatile GrpcClientProtocolProxy leaderProxy;
+ private final ClientId clientId;
+
+ private volatile RunningState running = RunningState.RUNNING;
+ private final ExceptionAndRetry exceptionAndRetry;
+ private final Sender senderThread;
+ private final RaftGroupId groupId;
+
+ GrpcClientStreamer(RaftProperties prop, RaftGroup group,
+ RaftPeerId leaderId, ClientId clientId) {
+ this.clientId = clientId;
+ maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
+ maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
+ dataQueue = new ConcurrentLinkedDeque<>();
+ ackQueue = new ConcurrentLinkedDeque<>();
+ exceptionAndRetry = new ExceptionAndRetry(prop);
+
+ this.groupId = group.getGroupId();
+ this.peers = group.getPeers().stream().collect(
+ Collectors.toMap(RaftPeer::getId, Function.identity()));
+ proxyMap = new PeerProxyMap<>(clientId.toString(),
+ raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
+ prop));
+ proxyMap.addPeers(group.getPeers());
+ refreshLeaderProxy(leaderId, null);
+
+ senderThread = new Sender();
+ senderThread.setName(this.toString() + "-sender");
+ senderThread.start();
+ }
+
+ private synchronized void refreshLeaderProxy(RaftPeerId suggested,
+ RaftPeerId oldLeader) {
+ if (suggested != null) {
+ leaderId = suggested;
+ } else {
+ if (oldLeader == null) {
+ leaderId = peers.keySet().iterator().next();
+ } else {
+ leaderId = CollectionUtils.random(oldLeader, peers.keySet());
+ if (leaderId == null) {
+ leaderId = oldLeader;
+ }
+ }
+ }
+ LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
+ oldLeader, leaderId, suggested);
+ if (leaderProxy != null) {
+ leaderProxy.closeCurrentSession();
+ }
+ try {
+ leaderProxy = proxyMap.getProxy(leaderId);
+ } catch (IOException e) {
+ LOG.error("Should not hit IOException here", e);
+ refreshLeader(null, leaderId);
+ }
+ }
+
+ private boolean isRunning() {
+ return running == RunningState.RUNNING ||
+ running == RunningState.LOOK_FOR_LEADER;
+ }
+
+ private void checkState() throws IOException {
+ if (!isRunning()) {
+ throwException("The GrpcClientStreamer has been closed");
+ }
+ }
+
+ synchronized void write(ByteString content, long seqNum)
+ throws IOException {
+ checkState();
+ while (isRunning() && dataQueue.size() >= maxPendingNum) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (isRunning()) {
+ // wrap the current buffer into a RaftClientRequestProto
+ final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
+ clientId, leaderId, groupId, seqNum, seqNum, content);
+ if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
+ throw new IOException("msg size:" + request.getSerializedSize() +
+ " exceeds maximum:" + maxMessageSize.getSizeInt());
+ }
+ dataQueue.offer(request);
+ this.notifyAll();
+ } else {
+ throwException(this + " got closed.");
+ }
+ }
+
+ synchronized void flush() throws IOException {
+ checkState();
+ if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
+ return;
+ }
+ // wait for the pending Q to become empty
+ while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+ throwException(this + " got closed before finishing flush");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!isRunning()) {
+ return;
+ }
+ flush();
+
+ running = RunningState.CLOSED;
+ senderThread.interrupt();
+ try {
+ senderThread.join();
+ } catch (InterruptedException ignored) {
+ }
+ proxyMap.close();
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "-" + clientId;
+ }
+
+ private class Sender extends Daemon {
+ @Override
+ public void run() {
+ while (isRunning()) {
+
+ synchronized (GrpcClientStreamer.this) {
+ while (isRunning() && shouldWait()) {
+ try {
+ GrpcClientStreamer.this.wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (running == RunningState.RUNNING) {
+ Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty");
+ RaftClientRequestProto next = dataQueue.poll();
+ leaderProxy.onNext(next);
+ ackQueue.offer(next);
+ }
+ }
+ }
+ }
+
+ private boolean shouldWait() {
+ // the sender should wait if any of the following is true
+ // 1) there is no data to send
+ // 2) there are too many outstanding pending requests
+ // 3) Error/NotLeaderException just happened, we're still waiting for
+ // the first response to confirm the new leader
+ return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
+ running == RunningState.LOOK_FOR_LEADER;
+ }
+ }
+
+ /** the response handler for stream RPC */
+ private class ResponseHandler implements
+ GrpcClientProtocolProxy.CloseableStreamObserver {
+ private final RaftPeerId targetId;
+ // once handled the first NotLeaderException or Error, the handler should
+ // be inactive and should not make any further action.
+ private volatile boolean active = true;
+
+ ResponseHandler(RaftPeer target) {
+ targetId = target.getId();
+ }
+
+ @Override
+ public String toString() {
+ return GrpcClientStreamer.this + "-ResponseHandler-" + targetId;
+ }
+
+ @Override
+ public void onNext(RaftClientReplyProto reply) {
+ if (!active) {
+ return;
+ }
+ synchronized (GrpcClientStreamer.this) {
+ RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek());
+ if (reply.getRpcReply().getSuccess()) {
+ Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(),
+ () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply));
+ ackQueue.poll();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending));
+ }
+ // we've identified the correct leader
+ if (running == RunningState.LOOK_FOR_LEADER) {
+ running = RunningState.RUNNING;
+ }
+ } else {
+ // this may be a NotLeaderException
+ RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
+ final NotLeaderException nle = r.getNotLeaderException();
+ if (nle != null) {
+ LOG.debug("{} received a NotLeaderException from {}", this,
+ r.getServerId());
+ handleNotLeader(nle, targetId);
+ }
+ }
+ GrpcClientStreamer.this.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.warn(this + " onError", t);
+ if (active) {
+ synchronized (GrpcClientStreamer.this) {
+ handleError(t, this);
+ GrpcClientStreamer.this.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("{} onCompleted, pending requests #: {}", this,
+ ackQueue.size());
+ }
+
+ @Override // called by handleError and handleNotLeader
+ public void close() throws IOException {
+ active = false;
+ }
+ }
+
+ private void throwException(String msg) throws IOException {
+ if (running == RunningState.ERROR) {
+ throw exceptionAndRetry.getCombinedException();
+ } else {
+ throw new IOException(msg);
+ }
+ }
+
+ private void handleNotLeader(NotLeaderException nle,
+ RaftPeerId oldLeader) {
+ Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
+ // handle NotLeaderException: refresh leader and RaftConfiguration
+ refreshPeers(nle.getPeers());
+
+ refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
+ }
+
+ private void handleError(Throwable t, ResponseHandler handler) {
+ Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
+ final IOException e = GrpcUtil.unwrapIOException(t);
+
+ exceptionAndRetry.addException(handler.targetId, e);
+ LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
+ handler, e, exceptionAndRetry.retryTimes.get(),
+ exceptionAndRetry.maxRetryTimes);
+
+ leaderProxy.onError();
+ if (exceptionAndRetry.shouldRetry()) {
+ refreshLeader(null, leaderId);
+ } else {
+ running = RunningState.ERROR;
+ }
+ }
+
+ private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) {
+ running = RunningState.LOOK_FOR_LEADER;
+ refreshLeaderProxy(suggestedLeader, oldLeader);
+ reQueuePendingRequests(leaderId);
+
+ final RaftClientRequestProto request = Objects.requireNonNull(
+ dataQueue.poll());
+ ackQueue.offer(request);
+ try {
+ exceptionAndRetry.retryInterval.sleep();
+ } catch (InterruptedException ignored) {
+ }
+ leaderProxy.onNext(request);
+ }
+
+ private void reQueuePendingRequests(RaftPeerId newLeader) {
+ if (isRunning()) {
+ // resend all the pending requests
+ while (!ackQueue.isEmpty()) {
+ final RaftClientRequestProto oldRequest = ackQueue.pollLast();
+ final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
+ .setReplyId(newLeader.toByteString());
+ final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest)
+ .setRpcRequest(newRpc).build();
+ dataQueue.offerFirst(newRequest);
+ }
+ }
+ }
+
+ private void refreshPeers(RaftPeer[] newPeers) {
+ if (newPeers != null && newPeers.length > 0) {
+ // we only add new peers, we do not remove any peer even if it no longer
+ // belongs to the current raft conf
+ Arrays.stream(newPeers).forEach(peer -> {
+ peers.putIfAbsent(peer.getId(), peer);
+ proxyMap.computeIfAbsent(peer);
+ });
+
+ LOG.debug("refreshed peers: {}", peers);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
new file mode 100644
index 0000000..e857aaf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GrpcOutputStream extends OutputStream {
+ /** internal buffer */
+ private final byte buf[];
+ private int count;
+ private final AtomicLong seqNum = new AtomicLong();
+ private final ClientId clientId;
+ private final GrpcClientStreamer streamer;
+
+ private boolean closed = false;
+
+ public GrpcOutputStream(RaftProperties prop, ClientId clientId,
+ RaftGroup group, RaftPeerId leaderId) {
+ final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
+ buf = new byte[bufferSize];
+ count = 0;
+ this.clientId = clientId;
+ streamer = new GrpcClientStreamer(prop, group, leaderId, clientId);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ checkClosed();
+ buf[count++] = (byte)b;
+ flushIfNecessary();
+ }
+
+ private void flushIfNecessary() throws IOException {
+ if(count == buf.length) {
+ flushToStreamer();
+ }
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ checkClosed();
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ int total = 0;
+ while (total < len) {
+ int toWrite = Math.min(len - total, buf.length - count);
+ System.arraycopy(b, off + total, buf, count, toWrite);
+ count += toWrite;
+ total += toWrite;
+ flushIfNecessary();
+ }
+ }
+
+ private void flushToStreamer() throws IOException {
+ if (count > 0) {
+ streamer.write(ProtoUtils.toByteString(buf, 0, count),
+ seqNum.getAndIncrement());
+ count = 0;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkClosed();
+ flushToStreamer();
+ streamer.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushToStreamer();
+ streamer.close(); // streamer will flush
+ this.closed = true;
+ }
+
+ @Override
+ public String toString() {
+ return "GrpcOutputStream-" + clientId;
+ }
+
+ private void checkClosed() throws IOException {
+ if (closed) {
+ throw new IOException(this.toString() + " was closed.");
+ }
+ }
+}
[2/3] incubator-ratis git commit: RATIS-324. Rename grpc classes.
Contributed by Tsz Wo Nicholas Sze.
Posted by lj...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
deleted file mode 100644
index 9dd1a31..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.RaftClientConfigKeys;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.util.TimeoutScheduler;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
-import org.apache.ratis.shaded.io.grpc.netty.NegotiationType;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
-import org.apache.ratis.util.CheckedSupplier;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-public class RaftClientProtocolClient implements Closeable {
- public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class);
-
- private final Supplier<String> name;
- private final RaftPeer target;
- private final ManagedChannel channel;
-
- private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
-
- private final RaftClientProtocolServiceBlockingStub blockingStub;
- private final RaftClientProtocolServiceStub asyncStub;
- private final AdminProtocolServiceBlockingStub adminBlockingStub;
-
- private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
-
- public RaftClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) {
- this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
- this.target = target;
-
- final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
- final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
- channel = NettyChannelBuilder.forTarget(target.getAddress())
- .negotiationType(NegotiationType.PLAINTEXT)
- .flowControlWindow(flowControlWindow.getSizeInt())
- .maxInboundMessageSize(maxMessageSize.getSizeInt())
- .build();
- blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
- asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
- adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
- this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
- }
-
- String getName() {
- return name.get();
- }
-
- @Override
- public void close() {
- final AsyncStreamObservers observers = appendStreamObservers.get();
- if (observers != null) {
- observers.close();
- }
- channel.shutdownNow();
- }
-
- RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
- return blockingCall(() -> adminBlockingStub
- .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .groupManagement(request));
- }
-
- ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) {
- return adminBlockingStub
- .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .serverInformation(request);
- }
-
- RaftClientReplyProto setConfiguration(
- SetConfigurationRequestProto request) throws IOException {
- return blockingCall(() -> blockingStub
- .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .setConfiguration(request));
- }
-
- private static RaftClientReplyProto blockingCall(
- CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
- ) throws IOException {
- try {
- return supplier.get();
- } catch (StatusRuntimeException e) {
- throw RaftGrpcUtil.unwrapException(e);
- }
- }
-
- StreamObserver<RaftClientRequestProto> append(
- StreamObserver<RaftClientReplyProto> responseHandler) {
- return asyncStub.append(responseHandler);
- }
-
- StreamObserver<RaftClientRequestProto> appendWithTimeout(
- StreamObserver<RaftClientReplyProto> responseHandler) {
- return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .append(responseHandler);
- }
-
- AsyncStreamObservers getAppendStreamObservers() {
- return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
- }
-
- public RaftPeer getTarget() {
- return target;
- }
-
- class AsyncStreamObservers implements Closeable {
- /** Request map: callId -> future */
- private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
- private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
- @Override
- public void onNext(RaftClientReplyProto proto) {
- final long callId = proto.getRpcReply().getCallId();
- try {
- final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
- final NotLeaderException nle = reply.getNotLeaderException();
- if (nle != null) {
- completeReplyExceptionally(nle, NotLeaderException.class.getName());
- return;
- }
- handleReplyFuture(callId, f -> f.complete(reply));
- } catch (Throwable t) {
- handleReplyFuture(callId, f -> f.completeExceptionally(t));
- }
- }
-
- @Override
- public void onError(Throwable t) {
- final IOException ioe = RaftGrpcUtil.unwrapIOException(t);
- completeReplyExceptionally(ioe, "onError");
- }
-
- @Override
- public void onCompleted() {
- completeReplyExceptionally(null, "completed");
- }
- };
- private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
-
- CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
- final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
- if (map == null) {
- return JavaUtils.completeExceptionally(new IOException("Already closed."));
- }
- final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
- CollectionUtils.putNew(request.getCallId(), f, map,
- () -> getName() + ":" + getClass().getSimpleName());
- try {
- requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
- scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
- () -> "Timeout check failed for client request: " + request);
- } catch(Throwable t) {
- handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
- }
- return f;
- }
-
- private void timeoutCheck(RaftClientRequest request) {
- handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
- new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
- }
-
- private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
- Optional.ofNullable(replies.get())
- .map(replyMap -> replyMap.remove(callId))
- .ifPresent(handler);
- }
-
- @Override
- public void close() {
- requestStreamObserver.onCompleted();
- completeReplyExceptionally(null, "close");
- }
-
- private void completeReplyExceptionally(Throwable t, String event) {
- appendStreamObservers.compareAndSet(this, null);
- final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
- if (map == null) {
- return;
- }
- for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
- final CompletableFuture<RaftClientReply> f = entry.getValue();
- if (!f.isDone()) {
- f.completeExceptionally(t != null? t
- : new IOException(getName() + ": Stream " + event
- + ": no reply for async request cid=" + entry.getKey()));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
deleted file mode 100644
index ee9ce4e..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.RaftPeer;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
-
-public class RaftClientProtocolProxy implements Closeable {
- private final RaftClientProtocolClient proxy;
- private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
- private RpcSession currentSession;
-
- public RaftClientProtocolProxy(ClientId clientId, RaftPeer target,
- Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
- RaftProperties properties) {
- proxy = new RaftClientProtocolClient(clientId, target, properties);
- this.responseHandlerCreation = responseHandlerCreation;
- }
-
- @Override
- public void close() throws IOException {
- closeCurrentSession();
- proxy.close();
- }
-
- @Override
- public String toString() {
- return "ProxyTo:" + proxy.getTarget();
- }
-
- public void closeCurrentSession() {
- if (currentSession != null) {
- currentSession.close();
- currentSession = null;
- }
- }
-
- public void onNext(RaftClientRequestProto request) {
- if (currentSession == null) {
- currentSession = new RpcSession(
- responseHandlerCreation.apply(proxy.getTarget()));
- }
- currentSession.requestObserver.onNext(request);
- }
-
- public void onError() {
- if (currentSession != null) {
- currentSession.onError();
- }
- }
-
- public interface CloseableStreamObserver
- extends StreamObserver<RaftClientReplyProto>, Closeable {
- }
-
- class RpcSession implements Closeable {
- private final StreamObserver<RaftClientRequestProto> requestObserver;
- private final CloseableStreamObserver responseHandler;
- private boolean hasError = false;
-
- RpcSession(CloseableStreamObserver responseHandler) {
- this.responseHandler = responseHandler;
- this.requestObserver = proxy.append(responseHandler);
- }
-
- void onError() {
- hasError = true;
- }
-
- @Override
- public void close() {
- if (!hasError) {
- try {
- requestObserver.onCompleted();
- } catch (Exception ignored) {
- }
- }
- try {
- responseHandler.close();
- } catch (IOException ignored) {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
deleted file mode 100644
index 4b92be5..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.SlidingWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase {
- public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
-
- private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
- private final RaftClientRequest request;
- private volatile RaftClientReply reply;
-
- PendingAppend(RaftClientRequest request) {
- this.request = request;
- }
-
- @Override
- public boolean hasReply() {
- return reply != null || this == COMPLETED;
- }
-
- @Override
- public void setReply(RaftClientReply reply) {
- this.reply = reply;
- }
-
- RaftClientReply getReply() {
- return reply;
- }
-
- RaftClientRequest getRequest() {
- return request;
- }
-
- @Override
- public long getSeqNum() {
- return request != null? request.getSeqNum(): Long.MAX_VALUE;
- }
-
- @Override
- public String toString() {
- return request != null? getSeqNum() + ":" + reply: "COMPLETED";
- }
- }
- private static final PendingAppend COMPLETED = new PendingAppend(null);
-
- private final Supplier<RaftPeerId> idSupplier;
- private final RaftClientAsynchronousProtocol protocol;
-
- public RaftClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
- this.idSupplier = idSupplier;
- this.protocol = protocol;
- }
-
- RaftPeerId getId() {
- return idSupplier.get();
- }
-
- @Override
- public void setConfiguration(SetConfigurationRequestProto proto,
- StreamObserver<RaftClientReplyProto> responseObserver) {
- final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
- RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
- ClientProtoUtils::toRaftClientReplyProto);
- }
-
- @Override
- public StreamObserver<RaftClientRequestProto> append(
- StreamObserver<RaftClientReplyProto> responseObserver) {
- return new AppendRequestStreamObserver(responseObserver);
- }
-
- private final AtomicInteger streamCount = new AtomicInteger();
-
- private class AppendRequestStreamObserver implements
- StreamObserver<RaftClientRequestProto> {
- private final String name = getId() + "-" + streamCount.getAndIncrement();
- private final StreamObserver<RaftClientReplyProto> responseObserver;
- private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
- = new SlidingWindow.Server<>(name, COMPLETED);
- private final AtomicBoolean isClosed;
-
- AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
- LOG.debug("new AppendRequestStreamObserver {}", name);
- this.responseObserver = ro;
- this.isClosed = new AtomicBoolean(false);
- }
-
- void processClientRequestAsync(PendingAppend pending) {
- try {
- protocol.submitClientRequestAsync(pending.getRequest()
- ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
- pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
- ).exceptionally(exception -> {
- // TODO: the exception may be from either raft or state machine.
- // Currently we skip all the following responses when getting an
- // exception from the state machine.
- responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
- return null;
- });
- } catch (IOException e) {
- throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
- }
- }
-
- @Override
- public void onNext(RaftClientRequestProto request) {
- try {
- final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
- final PendingAppend p = new PendingAppend(r);
- slidingWindow.receivedRequest(p, this::processClientRequestAsync);
- } catch (Throwable e) {
- responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
- }
- }
-
- private void sendReply(PendingAppend ready) {
- Preconditions.assertTrue(ready.hasReply());
- if (ready == COMPLETED) {
- close();
- } else {
- LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
- responseObserver.onNext(
- ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
- }
- }
-
- @Override
- public void onError(Throwable t) {
- // for now we just log a msg
- RaftGrpcUtil.warn(LOG, () -> name + ": onError", t);
- slidingWindow.close();
- }
-
- @Override
- public void onCompleted() {
- if (slidingWindow.endOfRequests()) {
- close();
- }
- }
-
- private void close() {
- if (isClosed.compareAndSet(false, true)) {
- LOG.debug("{}: close", name);
- responseObserver.onCompleted();
- slidingWindow.close();
- }
- }
-
- void responseError(Throwable t, Supplier<String> message) {
- if (isClosed.compareAndSet(false, true)) {
- t = JavaUtils.unwrapCompletionException(t);
- if (LOG.isDebugEnabled()) {
- LOG.debug(name + ": Failed " + message.get(), t);
- }
- responseObserver.onError(RaftGrpcUtil.wrapException(t));
- slidingWindow.close();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
deleted file mode 100644
index 09d57a0..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.ProtoUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class RaftOutputStream extends OutputStream {
- /** internal buffer */
- private final byte buf[];
- private int count;
- private final AtomicLong seqNum = new AtomicLong();
- private final ClientId clientId;
- private final AppendStreamer streamer;
-
- private boolean closed = false;
-
- public RaftOutputStream(RaftProperties prop, ClientId clientId,
- RaftGroup group, RaftPeerId leaderId) {
- final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
- buf = new byte[bufferSize];
- count = 0;
- this.clientId = clientId;
- streamer = new AppendStreamer(prop, group, leaderId, clientId);
- }
-
- @Override
- public void write(int b) throws IOException {
- checkClosed();
- buf[count++] = (byte)b;
- flushIfNecessary();
- }
-
- private void flushIfNecessary() throws IOException {
- if(count == buf.length) {
- flushToStreamer();
- }
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- checkClosed();
- if (off < 0 || len < 0 || off > b.length - len) {
- throw new ArrayIndexOutOfBoundsException();
- }
-
- int total = 0;
- while (total < len) {
- int toWrite = Math.min(len - total, buf.length - count);
- System.arraycopy(b, off + total, buf, count, toWrite);
- count += toWrite;
- total += toWrite;
- flushIfNecessary();
- }
- }
-
- private void flushToStreamer() throws IOException {
- if (count > 0) {
- streamer.write(ProtoUtils.toByteString(buf, 0, count),
- seqNum.getAndIncrement());
- count = 0;
- }
- }
-
- @Override
- public void flush() throws IOException {
- checkClosed();
- flushToStreamer();
- streamer.flush();
- }
-
- @Override
- public void close() throws IOException {
- flushToStreamer();
- streamer.close(); // streamer will flush
- this.closed = true;
- }
-
- @Override
- public String toString() {
- return "RaftOutputStream-" + clientId;
- }
-
- private void checkClosed() throws IOException {
- if (closed) {
- throw new IOException(this.toString() + " was closed.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
deleted file mode 100644
index d65abd0..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.AdminAsynchronousProtocol;
-import org.apache.ratis.protocol.GroupManagementRequest;
-import org.apache.ratis.protocol.ServerInformationRequest;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
-
-public class AdminProtocolService extends AdminProtocolServiceImplBase {
- private final AdminAsynchronousProtocol protocol;
-
- public AdminProtocolService(AdminAsynchronousProtocol protocol) {
- this.protocol = protocol;
- }
-
- @Override
- public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) {
- final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto);
- RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request),
- ClientProtoUtils::toRaftClientReplyProto);
- }
-
- @Override
- public void serverInformation(ServerInformationRequestProto proto,
- StreamObserver<ServerInformationReplyProto> responseObserver) {
- final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto);
- RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request),
- ClientProtoUtils::toServerInformationReplyProto);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
deleted file mode 100644
index 7dfe033..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGRpcService;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A new log appender implementation using grpc bi-directional stream API.
- */
-public class GRpcLogAppender extends LogAppender {
- public static final Logger LOG = LoggerFactory.getLogger(GRpcLogAppender.class);
-
- private final RaftGRpcService rpcService;
- private final Map<Long, AppendEntriesRequestProto> pendingRequests;
- private final int maxPendingRequestsNum;
- private long callId = 0;
- private volatile boolean firstResponseReceived = false;
-
- private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
-
- private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
-
- public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState,
- FollowerInfo f) {
- super(server, leaderState, f);
-
- this.rpcService = (RaftGRpcService) server.getServerRpc();
-
- maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
- server.getProxy().getProperties());
- requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
- pendingRequests = new ConcurrentHashMap<>();
- }
-
- private RaftServerProtocolClient getClient() throws IOException {
- return rpcService.getProxies().getProxy(follower.getPeer().getId());
- }
-
- private synchronized void resetClient(AppendEntriesRequestProto request) {
- rpcService.getProxies().resetProxy(follower.getPeer().getId());
- appendLogRequestObserver = null;
- firstResponseReceived = false;
-
- // clear the pending requests queue and reset the next index of follower
- final long nextIndex = request != null && request.hasPreviousLog()?
- request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
- clearPendingRequests(nextIndex);
- }
-
- @Override
- protected void runAppenderImpl() throws IOException {
- for(; isAppenderRunning(); mayWait()) {
- if (shouldSendRequest()) {
- SnapshotInfo snapshot = shouldInstallSnapshot();
- if (snapshot != null) {
- installSnapshot(snapshot);
- } else if (!shouldWait()) {
- // keep appending log entries or sending heartbeats
- appendLog();
- }
- }
- checkSlowness();
- }
-
- Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
- }
-
- private long getWaitTimeMs() {
- if (!shouldSendRequest()) {
- return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
- } else if (shouldWait()) {
- return halfMinTimeoutMs; // Should wait for a short time
- }
- return 0L;
- }
-
- private void mayWait() {
- // use lastSend time instead of lastResponse time
- final long waitTimeMs = getWaitTimeMs();
- if (waitTimeMs <= 0L) {
- return;
- }
-
- synchronized(this) {
- try {
- LOG.trace("{}: wait {}ms", this, waitTimeMs);
- wait(waitTimeMs);
- } catch(InterruptedException ie) {
- LOG.warn(this + ": Wait interrupted by " + ie);
- }
- }
- }
-
- @Override
- protected boolean shouldSendRequest() {
- return appendLogRequestObserver == null || super.shouldSendRequest();
- }
-
- /** @return true iff not received first response or queue is full. */
- private boolean shouldWait() {
- final int size = pendingRequests.size();
- if (size == 0) {
- return false;
- }
- return !firstResponseReceived || size >= maxPendingRequestsNum;
- }
-
- private void appendLog() throws IOException {
- final AppendEntriesRequestProto pending;
- final StreamObserver<AppendEntriesRequestProto> s;
- synchronized (this) {
- // prepare and enqueue the append request. note changes on follower's
- // nextIndex and ops on pendingRequests should always be associated
- // together and protected by the lock
- pending = createRequest(callId++);
- if (pending == null) {
- return;
- }
- pendingRequests.put(pending.getServerRequest().getCallId(), pending);
- updateNextIndex(pending);
- if (appendLogRequestObserver == null) {
- appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
- }
- s = appendLogRequestObserver;
- }
-
- if (isAppenderRunning()) {
- sendRequest(pending, s);
- }
- }
-
- private void sendRequest(AppendEntriesRequestProto request,
- StreamObserver<AppendEntriesRequestProto> s) {
- CodeInjectionForTesting.execute(RaftGRpcService.GRPC_SEND_SERVER_REQUEST,
- server.getId(), null, request);
-
- s.onNext(request);
- scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
- () -> "Timeout check failed for append entry request: " + request);
- follower.updateLastRpcSendTime();
- }
-
- private void timeoutAppendRequest(AppendEntriesRequestProto request) {
- AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
- if (pendingRequest != null) {
- LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
- }
- }
-
- private void updateNextIndex(AppendEntriesRequestProto request) {
- final int count = request.getEntriesCount();
- if (count > 0) {
- follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
- }
- }
-
- /**
- * StreamObserver for handling responses from the follower
- */
- private class AppendLogResponseHandler
- implements StreamObserver<AppendEntriesReplyProto> {
- /**
- * After receiving a appendEntries reply, do the following:
- * 1. If the reply is success, update the follower's match index and submit
- * an event to leaderState
- * 2. If the reply is NOT_LEADER, step down
- * 3. If the reply is INCONSISTENCY, decrease the follower's next index
- * based on the response
- */
- @Override
- public void onNext(AppendEntriesReplyProto reply) {
- LOG.debug("{} received {} response from {}", server.getId(),
- (!firstResponseReceived ? "the first" : "a"),
- follower.getPeer());
-
- // update the last rpc time
- follower.updateLastRpcResponseTime();
-
- if (!firstResponseReceived) {
- firstResponseReceived = true;
- }
- switch (reply.getResult()) {
- case SUCCESS:
- onSuccess(reply);
- break;
- case NOT_LEADER:
- onNotLeader(reply);
- break;
- case INCONSISTENCY:
- onInconsistency(reply);
- break;
- default:
- break;
- }
- notifyAppend();
- }
-
- /**
- * for now we simply retry the first pending request
- */
- @Override
- public void onError(Throwable t) {
- if (!isAppenderRunning()) {
- LOG.info("{} is stopped", GRpcLogAppender.this);
- return;
- }
- RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
-
- long callId = RaftGrpcUtil.getCallId(t);
- resetClient(pendingRequests.get(callId));
- }
-
- @Override
- public void onCompleted() {
- LOG.info("{} stops appending log entries to follower {}", server.getId(),
- follower);
- }
- }
-
- private void clearPendingRequests(long newNextIndex) {
- pendingRequests.clear();
- follower.decreaseNextIndex(newNextIndex);
- }
-
- protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
- AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
- if (request == null) {
- // If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply));
- return;
- }
- updateCommitIndex(request.getLeaderCommit());
-
- final long replyNextIndex = reply.getNextIndex();
- final long lastIndex = replyNextIndex - 1;
- final boolean updateMatchIndex;
-
- if (request.getEntriesCount() == 0) {
- Preconditions.assertTrue(!request.hasPreviousLog() ||
- lastIndex == request.getPreviousLog().getIndex(),
- "reply's next index is %s, request's previous is %s",
- replyNextIndex, request.getPreviousLog());
- updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex;
- } else {
- // check if the reply and the pending request is consistent
- final long lastEntryIndex = request
- .getEntries(request.getEntriesCount() - 1).getIndex();
- Preconditions.assertTrue(lastIndex == lastEntryIndex,
- "reply's next index is %s, request's last entry index is %s",
- replyNextIndex, lastEntryIndex);
- updateMatchIndex = true;
- }
- if (updateMatchIndex) {
- follower.updateMatchIndex(lastIndex);
- submitEventOnSuccessAppend();
- }
- }
-
- private void onNotLeader(AppendEntriesReplyProto reply) {
- checkResponseTerm(reply.getTerm());
- // the running loop will end and the connection will onComplete
- }
-
- private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
- AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
- if (request == null) {
- // If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Ignoring {}", server.getId(), reply);
- return;
- }
- Preconditions.assertTrue(request.hasPreviousLog());
- if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
- clearPendingRequests(reply.getNextIndex());
- }
- }
-
- private class InstallSnapshotResponseHandler
- implements StreamObserver<InstallSnapshotReplyProto> {
- private final Queue<Integer> pending;
- private final AtomicBoolean done = new AtomicBoolean(false);
-
- InstallSnapshotResponseHandler() {
- pending = new LinkedList<>();
- }
-
- synchronized void addPending(InstallSnapshotRequestProto request) {
- pending.offer(request.getRequestIndex());
- }
-
- synchronized void removePending(InstallSnapshotReplyProto reply) {
- int index = pending.poll();
- Preconditions.assertTrue(index == reply.getRequestIndex());
- }
-
- boolean isDone() {
- return done.get();
- }
-
- void close() {
- done.set(true);
- GRpcLogAppender.this.notifyAppend();
- }
-
- synchronized boolean hasAllResponse() {
- return pending.isEmpty();
- }
-
- @Override
- public void onNext(InstallSnapshotReplyProto reply) {
- LOG.debug("{} received {} response from {}", server.getId(),
- (!firstResponseReceived ? "the first" : "a"),
- follower.getPeer());
-
- // update the last rpc time
- follower.updateLastRpcResponseTime();
-
- if (!firstResponseReceived) {
- firstResponseReceived = true;
- }
-
- switch (reply.getResult()) {
- case SUCCESS:
- removePending(reply);
- break;
- case NOT_LEADER:
- checkResponseTerm(reply.getTerm());
- break;
- case UNRECOGNIZED:
- break;
- }
- }
-
- @Override
- public void onError(Throwable t) {
- if (!isAppenderRunning()) {
- LOG.info("{} is stopped", GRpcLogAppender.this);
- return;
- }
- LOG.info("{} got error when installing snapshot to {}, exception: {}",
- server.getId(), follower.getPeer(), t);
- resetClient(null);
- close();
- }
-
- @Override
- public void onCompleted() {
- LOG.info("{} stops sending snapshots to follower {}", server.getId(),
- follower);
- close();
- }
- }
-
- private void installSnapshot(SnapshotInfo snapshot) {
- LOG.info("{}: follower {}'s next index is {}," +
- " log's start index is {}, need to install snapshot",
- server.getId(), follower.getPeer(), follower.getNextIndex(),
- raftLog.getStartIndex());
-
- final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler();
- StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
- final String requestId = UUID.randomUUID().toString();
- try {
- snapshotRequestObserver = getClient().installSnapshot(responseHandler);
- for (InstallSnapshotRequestProto request :
- new SnapshotRequestIter(snapshot, requestId)) {
- if (isAppenderRunning()) {
- snapshotRequestObserver.onNext(request);
- follower.updateLastRpcSendTime();
- responseHandler.addPending(request);
- } else {
- break;
- }
- }
- snapshotRequestObserver.onCompleted();
- } catch (Exception e) {
- LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
- snapshot.getFiles(), e);
- if (snapshotRequestObserver != null) {
- snapshotRequestObserver.onError(e);
- }
- return;
- }
-
- synchronized (this) {
- while (isAppenderRunning() && !responseHandler.isDone()) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- if (responseHandler.hasAllResponse()) {
- follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
- follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
- LOG.info("{}: install snapshot-{} successfully on follower {}",
- server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
new file mode 100644
index 0000000..1201bf2
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.ServerInformationRequest;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
+
+public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase {
+ private final AdminAsynchronousProtocol protocol;
+
+ public GrpcAdminProtocolService(AdminAsynchronousProtocol protocol) {
+ this.protocol = protocol;
+ }
+
+ @Override
+ public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) {
+ final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
+
+ @Override
+ public void serverInformation(ServerInformationRequestProto proto,
+ StreamObserver<ServerInformationReplyProto> responseObserver) {
+ final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request),
+ ClientProtoUtils::toServerInformationReplyProto);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
new file mode 100644
index 0000000..3da58bf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A new log appender implementation using grpc bi-directional stream API.
+ */
+public class GrpcLogAppender extends LogAppender {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
+
+ private final GrpcService rpcService;
+ private final Map<Long, AppendEntriesRequestProto> pendingRequests;
+ private final int maxPendingRequestsNum;
+ private long callId = 0;
+ private volatile boolean firstResponseReceived = false;
+
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+
+ private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
+
+ public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState,
+ FollowerInfo f) {
+ super(server, leaderState, f);
+
+ this.rpcService = (GrpcService) server.getServerRpc();
+
+ maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
+ server.getProxy().getProperties());
+ requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
+ pendingRequests = new ConcurrentHashMap<>();
+ }
+
+ private GrpcServerProtocolClient getClient() throws IOException {
+ return rpcService.getProxies().getProxy(follower.getPeer().getId());
+ }
+
+ private synchronized void resetClient(AppendEntriesRequestProto request) {
+ rpcService.getProxies().resetProxy(follower.getPeer().getId());
+ appendLogRequestObserver = null;
+ firstResponseReceived = false;
+
+ // clear the pending requests queue and reset the next index of follower
+ final long nextIndex = request != null && request.hasPreviousLog()?
+ request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
+ clearPendingRequests(nextIndex);
+ }
+
+ @Override
+ protected void runAppenderImpl() throws IOException {
+ for(; isAppenderRunning(); mayWait()) {
+ if (shouldSendRequest()) {
+ SnapshotInfo snapshot = shouldInstallSnapshot();
+ if (snapshot != null) {
+ installSnapshot(snapshot);
+ } else if (!shouldWait()) {
+ // keep appending log entries or sending heartbeats
+ appendLog();
+ }
+ }
+ checkSlowness();
+ }
+
+ Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
+ }
+
+ private long getWaitTimeMs() {
+ if (!shouldSendRequest()) {
+ return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
+ } else if (shouldWait()) {
+ return halfMinTimeoutMs; // Should wait for a short time
+ }
+ return 0L;
+ }
+
+ private void mayWait() {
+ // use lastSend time instead of lastResponse time
+ final long waitTimeMs = getWaitTimeMs();
+ if (waitTimeMs <= 0L) {
+ return;
+ }
+
+ synchronized(this) {
+ try {
+ LOG.trace("{}: wait {}ms", this, waitTimeMs);
+ wait(waitTimeMs);
+ } catch(InterruptedException ie) {
+ LOG.warn(this + ": Wait interrupted by " + ie);
+ }
+ }
+ }
+
+ @Override
+ protected boolean shouldSendRequest() {
+ return appendLogRequestObserver == null || super.shouldSendRequest();
+ }
+
+ /** @return true iff not received first response or queue is full. */
+ private boolean shouldWait() {
+ final int size = pendingRequests.size();
+ if (size == 0) {
+ return false;
+ }
+ return !firstResponseReceived || size >= maxPendingRequestsNum;
+ }
+
+ private void appendLog() throws IOException {
+ final AppendEntriesRequestProto pending;
+ final StreamObserver<AppendEntriesRequestProto> s;
+ synchronized (this) {
+ // prepare and enqueue the append request. note changes on follower's
+ // nextIndex and ops on pendingRequests should always be associated
+ // together and protected by the lock
+ pending = createRequest(callId++);
+ if (pending == null) {
+ return;
+ }
+ pendingRequests.put(pending.getServerRequest().getCallId(), pending);
+ updateNextIndex(pending);
+ if (appendLogRequestObserver == null) {
+ appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
+ }
+ s = appendLogRequestObserver;
+ }
+
+ if (isAppenderRunning()) {
+ sendRequest(pending, s);
+ }
+ }
+
+ private void sendRequest(AppendEntriesRequestProto request,
+ StreamObserver<AppendEntriesRequestProto> s) {
+ CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
+ server.getId(), null, request);
+
+ s.onNext(request);
+ scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
+ () -> "Timeout check failed for append entry request: " + request);
+ follower.updateLastRpcSendTime();
+ }
+
+ private void timeoutAppendRequest(AppendEntriesRequestProto request) {
+ AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
+ if (pendingRequest != null) {
+ LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
+ }
+ }
+
+ private void updateNextIndex(AppendEntriesRequestProto request) {
+ final int count = request.getEntriesCount();
+ if (count > 0) {
+ follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
+ }
+ }
+
+ /**
+ * StreamObserver for handling responses from the follower
+ */
+ private class AppendLogResponseHandler
+ implements StreamObserver<AppendEntriesReplyProto> {
+ /**
+ * After receiving a appendEntries reply, do the following:
+ * 1. If the reply is success, update the follower's match index and submit
+ * an event to leaderState
+ * 2. If the reply is NOT_LEADER, step down
+ * 3. If the reply is INCONSISTENCY, decrease the follower's next index
+ * based on the response
+ */
+ @Override
+ public void onNext(AppendEntriesReplyProto reply) {
+ LOG.debug("{} received {} response from {}", server.getId(),
+ (!firstResponseReceived ? "the first" : "a"),
+ follower.getPeer());
+
+ // update the last rpc time
+ follower.updateLastRpcResponseTime();
+
+ if (!firstResponseReceived) {
+ firstResponseReceived = true;
+ }
+ switch (reply.getResult()) {
+ case SUCCESS:
+ onSuccess(reply);
+ break;
+ case NOT_LEADER:
+ onNotLeader(reply);
+ break;
+ case INCONSISTENCY:
+ onInconsistency(reply);
+ break;
+ default:
+ break;
+ }
+ notifyAppend();
+ }
+
+ /**
+ * for now we simply retry the first pending request
+ */
+ @Override
+ public void onError(Throwable t) {
+ if (!isAppenderRunning()) {
+ LOG.info("{} is stopped", GrpcLogAppender.this);
+ return;
+ }
+ GrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
+
+ long callId = GrpcUtil.getCallId(t);
+ resetClient(pendingRequests.get(callId));
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("{} stops appending log entries to follower {}", server.getId(),
+ follower);
+ }
+ }
+
+ private void clearPendingRequests(long newNextIndex) {
+ pendingRequests.clear();
+ follower.decreaseNextIndex(newNextIndex);
+ }
+
+ protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
+ AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
+ if (request == null) {
+ // If reply comes after timeout, the reply is ignored.
+ LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply));
+ return;
+ }
+ updateCommitIndex(request.getLeaderCommit());
+
+ final long replyNextIndex = reply.getNextIndex();
+ final long lastIndex = replyNextIndex - 1;
+ final boolean updateMatchIndex;
+
+ if (request.getEntriesCount() == 0) {
+ Preconditions.assertTrue(!request.hasPreviousLog() ||
+ lastIndex == request.getPreviousLog().getIndex(),
+ "reply's next index is %s, request's previous is %s",
+ replyNextIndex, request.getPreviousLog());
+ updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex;
+ } else {
+ // check if the reply and the pending request is consistent
+ final long lastEntryIndex = request
+ .getEntries(request.getEntriesCount() - 1).getIndex();
+ Preconditions.assertTrue(lastIndex == lastEntryIndex,
+ "reply's next index is %s, request's last entry index is %s",
+ replyNextIndex, lastEntryIndex);
+ updateMatchIndex = true;
+ }
+ if (updateMatchIndex) {
+ follower.updateMatchIndex(lastIndex);
+ submitEventOnSuccessAppend();
+ }
+ }
+
+ private void onNotLeader(AppendEntriesReplyProto reply) {
+ checkResponseTerm(reply.getTerm());
+ // the running loop will end and the connection will onComplete
+ }
+
+ private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
+ AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
+ if (request == null) {
+ // If reply comes after timeout, the reply is ignored.
+ LOG.warn("{}: Ignoring {}", server.getId(), reply);
+ return;
+ }
+ Preconditions.assertTrue(request.hasPreviousLog());
+ if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
+ clearPendingRequests(reply.getNextIndex());
+ }
+ }
+
+ private class InstallSnapshotResponseHandler
+ implements StreamObserver<InstallSnapshotReplyProto> {
+ private final Queue<Integer> pending;
+ private final AtomicBoolean done = new AtomicBoolean(false);
+
+ InstallSnapshotResponseHandler() {
+ pending = new LinkedList<>();
+ }
+
+ synchronized void addPending(InstallSnapshotRequestProto request) {
+ pending.offer(request.getRequestIndex());
+ }
+
+ synchronized void removePending(InstallSnapshotReplyProto reply) {
+ int index = pending.poll();
+ Preconditions.assertTrue(index == reply.getRequestIndex());
+ }
+
+ boolean isDone() {
+ return done.get();
+ }
+
+ void close() {
+ done.set(true);
+ GrpcLogAppender.this.notifyAppend();
+ }
+
+ synchronized boolean hasAllResponse() {
+ return pending.isEmpty();
+ }
+
+ @Override
+ public void onNext(InstallSnapshotReplyProto reply) {
+ LOG.debug("{} received {} response from {}", server.getId(),
+ (!firstResponseReceived ? "the first" : "a"),
+ follower.getPeer());
+
+ // update the last rpc time
+ follower.updateLastRpcResponseTime();
+
+ if (!firstResponseReceived) {
+ firstResponseReceived = true;
+ }
+
+ switch (reply.getResult()) {
+ case SUCCESS:
+ removePending(reply);
+ break;
+ case NOT_LEADER:
+ checkResponseTerm(reply.getTerm());
+ break;
+ case UNRECOGNIZED:
+ break;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (!isAppenderRunning()) {
+ LOG.info("{} is stopped", GrpcLogAppender.this);
+ return;
+ }
+ LOG.info("{} got error when installing snapshot to {}, exception: {}",
+ server.getId(), follower.getPeer(), t);
+ resetClient(null);
+ close();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("{} stops sending snapshots to follower {}", server.getId(),
+ follower);
+ close();
+ }
+ }
+
+ private void installSnapshot(SnapshotInfo snapshot) {
+ LOG.info("{}: follower {}'s next index is {}," +
+ " log's start index is {}, need to install snapshot",
+ server.getId(), follower.getPeer(), follower.getNextIndex(),
+ raftLog.getStartIndex());
+
+ final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler();
+ StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
+ final String requestId = UUID.randomUUID().toString();
+ try {
+ snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+ for (InstallSnapshotRequestProto request :
+ new SnapshotRequestIter(snapshot, requestId)) {
+ if (isAppenderRunning()) {
+ snapshotRequestObserver.onNext(request);
+ follower.updateLastRpcSendTime();
+ responseHandler.addPending(request);
+ } else {
+ break;
+ }
+ }
+ snapshotRequestObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
+ snapshot.getFiles(), e);
+ if (snapshotRequestObserver != null) {
+ snapshotRequestObserver.onError(e);
+ }
+ return;
+ }
+
+ synchronized (this) {
+ while (isAppenderRunning() && !responseHandler.isDone()) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ if (responseHandler.hasAllResponse()) {
+ follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
+ follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+ LOG.info("{}: install snapshot-{} successfully on follower {}",
+ server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
new file mode 100644
index 0000000..3b2f8ba
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.Closeable;
+
+/**
+ * This is a RaftClient implementation that supports streaming data to the raft
+ * ring. The stream implementation utilizes gRPC.
+ */
+public class GrpcServerProtocolClient implements Closeable {
+ private final ManagedChannel channel;
+ private final TimeDuration requestTimeoutDuration;
+ private final RaftServerProtocolServiceBlockingStub blockingStub;
+ private final RaftServerProtocolServiceStub asyncStub;
+
+ public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
+ TimeDuration requestTimeoutDuration) {
+ channel = NettyChannelBuilder.forTarget(target.getAddress())
+ .usePlaintext(true).flowControlWindow(flowControlWindow)
+ .build();
+ blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
+ asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+ this.requestTimeoutDuration = requestTimeoutDuration;
+ }
+
+ @Override
+ public void close() {
+ channel.shutdownNow();
+ }
+
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
+ // the StatusRuntimeException will be handled by the caller
+ RequestVoteReplyProto r =
+ blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .requestVote(request);
+ return r;
+ }
+
+ StreamObserver<AppendEntriesRequestProto> appendEntries(
+ StreamObserver<AppendEntriesReplyProto> responseHandler) {
+ return asyncStub.appendEntries(responseHandler);
+ }
+
+ StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+ StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+ return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .installSnapshot(responseHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
new file mode 100644
index 0000000..83335b8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
+
+ private final Supplier<RaftPeerId> idSupplier;
+ private final RaftServer server;
+
+ public GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) {
+ this.idSupplier = idSupplier;
+ this.server = server;
+ }
+
+ RaftPeerId getId() {
+ return idSupplier.get();
+ }
+
+ @Override
+ public void requestVote(RequestVoteRequestProto request,
+ StreamObserver<RequestVoteReplyProto> responseObserver) {
+ try {
+ final RequestVoteReplyProto reply = server.requestVote(request);
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } catch (Throwable e) {
+ GrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e);
+ responseObserver.onError(GrpcUtil.wrapException(e));
+ }
+ }
+
+ @Override
+ public StreamObserver<AppendEntriesRequestProto> appendEntries(
+ StreamObserver<AppendEntriesReplyProto> responseObserver) {
+ return new StreamObserver<AppendEntriesRequestProto>() {
+ private final AtomicReference<CompletableFuture<Void>> previousOnNext =
+ new AtomicReference<>(CompletableFuture.completedFuture(null));
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ @Override
+ public void onNext(AppendEntriesRequestProto request) {
+ final CompletableFuture<Void> current = new CompletableFuture<>();
+ final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
+ try {
+ server.appendEntriesAsync(request).thenCombine(previous,
+ (reply, v) -> {
+ if (!isClosed.get()) {
+ responseObserver.onNext(reply);
+ }
+ current.complete(null);
+ return null;
+ });
+ } catch (Throwable e) {
+ GrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e);
+ responseObserver.onError(GrpcUtil.wrapException(e, request.getServerRequest().getCallId()));
+ current.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // for now we just log a msg
+ GrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.info("{}: appendEntries completed", getId());
+ responseObserver.onCompleted();
+ }
+ }
+ };
+ }
+
+ @Override
+ public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+ StreamObserver<InstallSnapshotReplyProto> responseObserver) {
+ return new StreamObserver<InstallSnapshotRequestProto>() {
+ @Override
+ public void onNext(InstallSnapshotRequestProto request) {
+ try {
+ final InstallSnapshotReplyProto reply = server.installSnapshot(request);
+ responseObserver.onNext(reply);
+ } catch (Throwable e) {
+ GrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e);
+ responseObserver.onError(GrpcUtil.wrapException(e));
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ GrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("{}: installSnapshot completed", getId());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
new file mode 100644
index 0000000..eb8310c
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.function.Supplier;
+
+/** A grpc implementation of {@link RaftServerRpc}. */
+public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>> {
+ static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
+ public static final String GRPC_SEND_SERVER_REQUEST =
+ GrpcService.class.getSimpleName() + ".sendRequest";
+
+ public static class Builder extends RaftServerRpc.Builder<Builder, GrpcService> {
+ private Builder() {}
+
+ @Override
+ public Builder getThis() {
+ return this;
+ }
+
+ @Override
+ public GrpcService build() {
+ return new GrpcService(getServer());
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private final Server server;
+ private final Supplier<InetSocketAddress> addressSupplier;
+
+ private GrpcService(RaftServer server) {
+ this(server, server::getId,
+ GrpcConfigKeys.Server.port(server.getProperties()),
+ GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
+ RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
+ GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
+ RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
+ }
+ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
+ SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
+ SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) {
+ super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
+ p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
+ if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
+ throw new IllegalArgumentException("Illegal configuration: "
+ + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+ + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
+ }
+
+ server = NettyServerBuilder.forPort(port)
+ .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
+ .flowControlWindow(flowControlWindow.getSizeInt())
+ .addService(new GrpcServerProtocolService(idSupplier, raftServer))
+ .addService(new GrpcClientProtocolService(idSupplier, raftServer))
+ .addService(new GrpcAdminProtocolService(raftServer))
+ .build();
+ addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
+ }
+
+ @Override
+ public SupportedRpcType getRpcType() {
+ return SupportedRpcType.GRPC;
+ }
+
+ @Override
+ public void startImpl() {
+ try {
+ server.start();
+ } catch (IOException e) {
+ ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
+ }
+ LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress());
+ }
+
+ @Override
+ public void closeImpl() throws IOException {
+ final String name = getId() + ": shutdown server with port " + server.getPort();
+ LOG.info("{} now", name);
+ final Server s = server.shutdownNow();
+ super.closeImpl();
+ try {
+ s.awaitTermination();
+ } catch(InterruptedException e) {
+ throw IOUtils.toInterruptedIOException(name + " failed", e);
+ }
+ LOG.info("{} successfully", name);
+ }
+
+ @Override
+ public InetSocketAddress getInetSocketAddress() {
+ return addressSupplier.get();
+ }
+
+ @Override
+ public AppendEntriesReplyProto appendEntries(
+ AppendEntriesRequestProto request) throws IOException {
+ throw new UnsupportedOperationException(
+ "Blocking AppendEntries call is not supported");
+ }
+
+ @Override
+ public InstallSnapshotReplyProto installSnapshot(
+ InstallSnapshotRequestProto request) throws IOException {
+ throw new UnsupportedOperationException(
+ "Blocking InstallSnapshot call is not supported");
+ }
+
+ @Override
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
+ throws IOException {
+ CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
+ null, request);
+
+ final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+ return getProxies().getProxy(target).requestVote(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
deleted file mode 100644
index b801c2a..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.util.TimeDuration;
-
-import java.io.Closeable;
-
-/**
- * This is a RaftClient implementation that supports streaming data to the raft
- * ring. The stream implementation utilizes gRPC.
- */
-public class RaftServerProtocolClient implements Closeable {
- private final ManagedChannel channel;
- private final TimeDuration requestTimeoutDuration;
- private final RaftServerProtocolServiceBlockingStub blockingStub;
- private final RaftServerProtocolServiceStub asyncStub;
-
- public RaftServerProtocolClient(RaftPeer target, int flowControlWindow,
- TimeDuration requestTimeoutDuration) {
- channel = NettyChannelBuilder.forTarget(target.getAddress())
- .usePlaintext(true).flowControlWindow(flowControlWindow)
- .build();
- blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
- asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
- this.requestTimeoutDuration = requestTimeoutDuration;
- }
-
- @Override
- public void close() {
- channel.shutdownNow();
- }
-
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
- // the StatusRuntimeException will be handled by the caller
- RequestVoteReplyProto r =
- blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .requestVote(request);
- return r;
- }
-
- StreamObserver<AppendEntriesRequestProto> appendEntries(
- StreamObserver<AppendEntriesReplyProto> responseHandler) {
- return asyncStub.appendEntries(responseHandler);
- }
-
- StreamObserver<InstallSnapshotRequestProto> installSnapshot(
- StreamObserver<InstallSnapshotReplyProto> responseHandler) {
- return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .installSnapshot(responseHandler);
- }
-}