You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/09/04 21:16:29 UTC
incubator-ratis git commit: Ratis-105. Server should check group id
for client requests. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master fd230c448 -> 4dbe99bbd
Ratis-105. Server should check group id for client requests. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4dbe99bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4dbe99bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4dbe99bb
Branch: refs/heads/master
Commit: 4dbe99bbd1c5fc7e2d9a6eba4d6df5726b68a9c8
Parents: fd230c4
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Sep 4 14:16:22 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Sep 4 14:16:22 2017 -0700
----------------------------------------------------------------------
.../ratis/protocol/GroupMismatchException.java | 28 ++++++++
.../org/apache/ratis/TestMultiRaftGroup.java | 76 ++++++++++++++++++++
.../ratis/server/impl/PendingRequests.java | 7 +-
.../ratis/server/impl/RaftServerImpl.java | 12 ++++
.../ratis/server/impl/StateMachineUpdater.java | 4 ++
.../java/org/apache/ratis/MiniRaftCluster.java | 4 ++
.../java/org/apache/ratis/RaftBasicTests.java | 4 +-
.../server/impl/ReinitializationBaseTest.java | 36 +++++++---
.../ratis/statemachine/TestStateMachine.java | 2 +-
9 files changed, 157 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java
new file mode 100644
index 0000000..af60825
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * This exception indicates that the group id in the request does not match
+ * server's group id.
+ */
+public class GroupMismatchException extends RaftException {
+ public GroupMismatchException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
new file mode 100644
index 0000000..8d3966e
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.RaftExamplesTestUtil;
+import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
+import org.apache.ratis.examples.arithmetic.TestArithmetic;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+import org.apache.ratis.util.CheckedBiConsumer;
+import org.apache.ratis.util.LogUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class TestMultiRaftGroup extends BaseTest {
+ static {
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() throws IOException {
+ return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class, 0);
+ }
+
+ @Parameterized.Parameter
+ public MiniRaftCluster cluster;
+
+ @Test
+ public void testMultiRaftGroup() throws Exception {
+ runTestMultiRaftGroup(3, 6, 9, 12, 15);
+ }
+
+ private void runTestMultiRaftGroup(int... idIndex) throws Exception {
+ runTestMultiRaftGroup(idIndex, -1);
+ }
+
+ private final AtomicInteger start = new AtomicInteger(3);
+ private final int count = 10;
+
+ private void runTestMultiRaftGroup(int[] idIndex, int chosen) throws Exception {
+
+ final CheckedBiConsumer<MiniRaftCluster, RaftGroup, IOException> checker = (cluster, group) -> {
+ try (final RaftClient client = cluster.createClient(group)) {
+ TestArithmetic.runTestPythagorean(client, start.getAndAdd(2*count), count);
+ }
+ };
+
+ ReinitializationBaseTest.runTestReinitializeMultiGroups(
+ cluster, idIndex, chosen, checker);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 98bc0a7..b7b8a9e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@@ -45,7 +44,10 @@ class PendingRequests {
TransactionContext entry) {
// externally synced for now
Preconditions.assertTrue(!request.isReadOnly());
- Preconditions.assertTrue(last == null || index == last.getIndex() + 1);
+ if (last != null && !(last.getRequest() instanceof SetConfigurationRequest)) {
+ Preconditions.assertTrue(index == last.getIndex() + 1,
+ () -> "index = " + index + " != last.getIndex() + 1, last=" + last);
+ }
return add(index, request, entry);
}
@@ -60,6 +62,7 @@ class PendingRequests {
PendingRequest addConfRequest(SetConfigurationRequest request) {
Preconditions.assertTrue(pendingSetConf == null);
pendingSetConf = new PendingRequest(request);
+ last = pendingSetConf;
return pendingSetConf;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 14acfb4..4463914 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -398,6 +398,14 @@ public class RaftServerImpl implements RaftServerProtocol,
expected);
}
+ void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
+ if (!groupId.equals(requestorGroupId)) {
+ throw new GroupMismatchException(getId()
+ + ": The group (" + requestorGroupId + ") of requestor " + requestorId
+ + " does not match the group (" + groupId + ") of the server " + getId());
+ }
+ }
+
/**
* Handle a normal update request from client.
*/
@@ -595,6 +603,7 @@ public class RaftServerImpl implements RaftServerProtocol,
LOG.debug("{}: receive requestVote({}, {}, {}, {})",
getId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertLifeCycleState(RUNNING);
+ assertGroup(candidateId, candidateGroupId);
boolean voteGranted = false;
boolean shouldShutdown = false;
@@ -699,6 +708,7 @@ public class RaftServerImpl implements RaftServerProtocol,
+ initializing + ServerProtoUtils.toString(entries));
assertLifeCycleState(STARTING, RUNNING);
+ assertGroup(leaderId, leaderGroupId);
try {
validateEntries(leaderTerm, previous, entries);
@@ -792,11 +802,13 @@ public class RaftServerImpl implements RaftServerProtocol,
InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
+ final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(),
leaderId, request);
LOG.debug("{}: receive installSnapshot({})", getId(), request);
assertLifeCycleState(STARTING, RUNNING);
+ assertGroup(leaderId, leaderGroupId);
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 7ed6731..9ef6ce7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -86,6 +86,10 @@ class StateMachineUpdater implements Runnable {
state = State.STOP;
updater.interrupt();
try {
+ updater.join();
+ } catch (InterruptedException ignored) {
+ }
+ try {
stateMachine.close();
} catch (IOException ignored) {
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index b527a58..79cb9bb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -422,6 +422,10 @@ public abstract class MiniRaftCluster {
return createClient(null, group);
}
+ public RaftClient createClient(RaftGroup g) {
+ return createClient(null, g);
+ }
+
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index b227e47..7e08809 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -80,7 +80,7 @@ public abstract class RaftBasicTests extends BaseTest {
LOG.info(cluster.printServers());
final SimpleMessage[] messages = SimpleMessage.create(10);
- try(final RaftClient client = cluster.createClient(null)) {
+ try(final RaftClient client = cluster.createClient()) {
for (SimpleMessage message : messages) {
client.send(message);
}
@@ -149,7 +149,7 @@ public abstract class RaftBasicTests extends BaseTest {
final List<Client4TestWithLoad> clients
= Stream.iterate(0, i -> i+1).limit(numClients)
- .map(i -> cluster.createClient(null))
+ .map(i -> cluster.createClient())
.map(c -> new Client4TestWithLoad(c, numMessages))
.collect(Collectors.toList());
clients.forEach(Thread::start);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 5bc8dbe..d9068d1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -27,10 +27,13 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
@@ -42,7 +45,9 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public abstract class ReinitializationBaseTest extends BaseTest {
- static {
+ static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class);
+
+ {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
@@ -78,7 +83,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
// Reinitialize servers
final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers());
- final RaftClient client = cluster.createClient(null, newGroup);
+ final RaftClient client = cluster.createClient(newGroup);
for(RaftPeer p : newGroup.getPeers()) {
client.reinitialize(newGroup, p.getId());
}
@@ -106,8 +111,15 @@ public abstract class ReinitializationBaseTest extends BaseTest {
private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception {
printThreadCount(null, "init");
- final MiniRaftCluster cluster = getCluster(0);
+ runTestReinitializeMultiGroups(getCluster(0), idIndex, chosen, NOOP);
+ }
+
+ static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {};
+ public static <T extends Throwable> void runTestReinitializeMultiGroups(
+ MiniRaftCluster cluster, int[] idIndex, int chosen,
+ CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker)
+ throws IOException, InterruptedException, T {
if (chosen < 0) {
chosen = ThreadLocalRandom.current().nextInt(idIndex.length);
}
@@ -147,6 +159,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
}
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
+ checker.accept(cluster, groups[i]);
}
printThreadCount(type, "start groups");
LOG.info("start groups: " + cluster.printServers());
@@ -171,22 +184,23 @@ public abstract class ReinitializationBaseTest extends BaseTest {
// update chosen group to use all the peers
final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId());
- final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS);
for(int i = 0; i < groups.length; i++) {
- LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId()));
- if (i == chosen) {
- try (final RaftClient client = cluster.createClient(null, groups[i])) {
- client.setConfiguration(array);
- }
- } else {
- for(RaftPeer p : groups[i].getPeers()) {
+ if (i != chosen) {
+ LOG.info(i + ") reinitialize: " + cluster.printServers(groups[i].getGroupId()));
+ for (RaftPeer p : groups[i].getPeers()) {
try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
client.reinitialize(newGroup, p.getId());
}
}
}
}
+ LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
+ try (final RaftClient client = cluster.createClient(groups[chosen])) {
+ client.setConfiguration(allPeers.toArray(RaftPeer.EMPTY_PEERS));
+ }
+
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ checker.accept(cluster, groups[chosen]);
LOG.info("update groups: " + cluster.printServers());
printThreadCount(type, "update groups");
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4dbe99bb/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index f41e764..73ce69d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -156,7 +156,7 @@ public class TestStateMachine extends BaseTest {
int numTrx = 100;
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx);
- try(final RaftClient client = cluster.createClient(null)) {
+ try(final RaftClient client = cluster.createClient()) {
for (RaftTestUtil.SimpleMessage message : messages) {
client.send(message);
}