You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/04 07:03:53 UTC
[incubator-ratis] branch master updated: RATIS-1280. Validate
RoutingTable before building it. (#390)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 841a04c RATIS-1280. Validate RoutingTable before building it. (#390)
841a04c is described below
commit 841a04c37416510abc87fd150d35bb511c6b7cb7
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Jan 4 15:03:46 2021 +0800
RATIS-1280. Validate RoutingTable before building it. (#390)
---
.../org/apache/ratis/protocol/RoutingTable.java | 96 +++++++++++++++++++++-
.../src/test/java/org/apache/ratis/BaseTest.java | 16 ----
.../filestore/FileStoreStreamingBaseTest.java | 7 +-
.../ratis/datastream/DataStreamBaseTest.java | 2 +-
.../ratis/datastream/DataStreamClusterTests.java | 6 ++
.../ratis/datastream/DataStreamTestUtils.java | 23 +++++-
.../apache/ratis/protocol/TestRoutingTable.java | 91 ++++++++++++++++++++
7 files changed, 216 insertions(+), 25 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
index 2083f42..d6fc1a5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,8 @@
package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import java.util.Arrays;
@@ -26,20 +28,30 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+/**
+ * A routing table is a directed acyclic graph containing exactly one primary peer such that
+ * (1) the primary peer is the only starting peer, and
+ * (2) all the other peers can be reached from the primary peer by exactly one path.
+ */
public interface RoutingTable {
+ /** @return the successor peers of the given peer. */
Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
+ /** @return the proto of this {@link RoutingTable}. */
RoutingTableProto toProto();
+ /** @return a new builder to build a {@link RoutingTable}. */
static Builder newBuilder() {
return new Builder();
}
-
+ /** To build a {@link RoutingTable}. */
class Builder {
private final AtomicReference<Map<RaftPeerId, Set<RaftPeerId>>> ref = new AtomicReference<>(new HashMap<>());
@@ -70,9 +82,87 @@ public interface RoutingTable {
.map(RoutingTable::newRoutingTable)
.orElseThrow(() -> new IllegalStateException("RoutingTable Already built"));
}
+
+ static void validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
+ if (map != null && !map.isEmpty()) {
+ new Builder.Validation(map).run();
+ }
+ }
+
+ /** Validate if a map represents a valid routing table. */
+ private static final class Validation {
+ private final Map<RaftPeerId, Set<RaftPeerId>> map;
+ private final RaftPeerId primary;
+ private final Set<RaftPeerId> unreachablePeers;
+
+ private Validation(Map<RaftPeerId, Set<RaftPeerId>> map) {
+ this.map = Objects.requireNonNull(map, "map == null");
+
+ final Set<RaftPeerId> allPeers = new HashSet<>(map.keySet());
+ final Set<RaftPeerId> startingPeers = new HashSet<>(map.keySet());
+ int numEdges = 0;
+ for (Map.Entry<RaftPeerId, Set<RaftPeerId>> entry: map.entrySet()) {
+ final Set<RaftPeerId> successors = entry.getValue();
+ if (successors == null) {
+ continue;
+ }
+ for (RaftPeerId s : successors) {
+ Preconditions.assertTrue(!s.equals(entry.getKey()), () -> "Invalid routing table: the peer " + s
+ + " has a self-loop, " + this);
+
+ if (!startingPeers.remove(s)) { //the primary peer cannot be a successor
+ final boolean added = allPeers.add(s); //an ending peer may not be contained as a key in the map
+ Preconditions.assertTrue(added, () -> "Invalid routing table: the peer " + s
+ + " has more than one predecessors, " + this);
+ }
+ }
+ numEdges += successors.size();
+ }
+
+ Preconditions.assertTrue(numEdges == allPeers.size() - 1,
+ "Invalid routing table: #edges = %d != #vertices - 1, #vertices=%d, %s",
+ numEdges, allPeers.size(), this);
+ Preconditions.assertTrue(!startingPeers.isEmpty(),
+ () -> "Invalid routing table: Starting peer not found, " + this);
+ Preconditions.assertTrue(startingPeers.size() == 1,
+ () -> "Invalid routing table: More than one starting peers: " + startingPeers + ", " + this);
+
+ this.primary = startingPeers.iterator().next();
+ this.unreachablePeers = allPeers;
+ }
+
+ private void run() {
+ depthFirstSearch(primary);
+ Preconditions.assertTrue(unreachablePeers.isEmpty() ,
+ () -> "Invalid routing table: peer(s) " + unreachablePeers + " are unreachable, " + this);
+ }
+
+ private void depthFirstSearch(RaftPeerId current) {
+ final boolean removed = unreachablePeers.remove(current);
+ Preconditions.assertTrue(removed, () -> "Invalid routing table: the peer " + current
+ + " has more than one predecessors, " + this);
+ for (RaftPeerId successor : get(current)) {
+ depthFirstSearch(successor);
+ }
+ }
+
+ private Set<RaftPeerId> get(RaftPeerId peerId) {
+ return Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
+ }
+
+ @Override
+ public String toString() {
+ return "primary=" + primary + ", map=" + map;
+ }
+ }
}
+ /** @return a new {@link RoutingTable} represented by the given map. */
static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
+ Builder.validate(map);
+
+ final Supplier<RoutingTableProto> proto = JavaUtils.memoize(
+ () -> RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build());
return new RoutingTable() {
@Override
public Set<RaftPeerId> getSuccessors(RaftPeerId peerId) {
@@ -81,7 +171,7 @@ public interface RoutingTable {
@Override
public RoutingTableProto toProto() {
- return RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build();
+ return proto.get();
}
};
}
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 058d5ea..c77b01f 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -20,7 +20,6 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
@@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -81,20 +79,6 @@ public abstract class BaseTest {
return peersWithPriority;
}
- public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
- RoutingTable.Builder builder = RoutingTable.newBuilder();
- RaftPeer previous = primary;
- for (RaftPeer peer : peers) {
- if (peer.equals(primary)) {
- continue;
- }
- builder.addSuccessor(previous.getId(), peer.getId());
- previous = peer;
- }
-
- return builder.build();
- }
-
@After
public void assertNoFailures() {
final Throwable e = firstException.get();
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index bd10d28..92147de 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,6 +21,7 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
@@ -73,7 +74,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
final CheckedSupplier<FileStoreClient, IOException> newClient =
() -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
- RoutingTable routingTable = getRoutingTable(peers, primary);
+ RoutingTable routingTable = DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient, routingTable);
testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient, routingTable);
testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient, routingTable);
@@ -95,7 +96,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
final CheckedSupplier<FileStoreClient, IOException> newClient =
() -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
- RoutingTable routingTable = getRoutingTable(peers, primary);
+ RoutingTable routingTable = DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient, routingTable);
testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient, routingTable);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index ac0e9c1..b3db0e0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -133,7 +133,7 @@ abstract class DataStreamBaseTest extends BaseTest {
throws IOException {
try (final RaftClient client = newRaftClientForDataStream(clientId)) {
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
- .stream(null, getRoutingTable(peers, getPrimaryServer().getPeer()));
+ .stream(null, DataStreamTestUtils.getRoutingTableChainTopology(peers, getPrimaryServer().getPeer()));
if (headerException != null) {
final DataStreamReply headerReply = out.getHeaderFuture().join();
Assert.assertFalse(headerReply.isSuccess());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index c9625a6..fd47045 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
@@ -38,6 +39,7 @@ import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
@@ -51,6 +53,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
public static final int NUM_SERVERS = 3;
+ RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+ return DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
+ }
+
@Test
public void testStreamWrites() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 14d5fe5..b00e389 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -33,11 +33,11 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientMessage;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -46,6 +46,7 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
@@ -123,6 +124,24 @@ public interface DataStreamTestUtils {
return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
}
+ static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeer> peers, RaftPeer primary) {
+ return getRoutingTableChainTopology(CollectionUtils.as(peers, RaftPeer::getId), primary.getId());
+ }
+
+ static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeerId> peers, RaftPeerId primary) {
+ final RoutingTable.Builder builder = RoutingTable.newBuilder();
+ RaftPeerId previous = primary;
+ for (RaftPeerId peer : peers) {
+ if (peer.equals(primary)) {
+ continue;
+ }
+ builder.addSuccessor(previous, peer);
+ previous = peer;
+ }
+
+ return builder.build();
+ }
+
class MultiDataStreamStateMachine extends BaseStateMachine {
private final ConcurrentMap<ClientInvocationId, SingleDataStream> streams = new ConcurrentHashMap<>();
diff --git a/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java
new file mode 100644
index 0000000..58bdf07
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRoutingTable extends BaseTest {
+ @Override
+ public int getGlobalTimeoutSeconds() {
+ return 1;
+ }
+
+ private final RaftPeerId[] peers = new RaftPeerId[10];
+
+ {
+ for(int i = 0; i < peers.length; i++) {
+ peers[i] = RaftPeerId.valueOf("s" + i);
+ }
+ }
+
+ @Test
+ public void testRoutingTableValidation() {
+ { // empty table
+ newRoutingTable();
+ }
+
+ { // 0 -> 1 -> 2
+ newRoutingTable(0, 1, 1, 2);
+ }
+
+ { // 0 -> 1, 0 -> 2
+ newRoutingTable(0, 1, 0, 2);
+ }
+
+ testFailureCase(" #edges < #vertices - 1", 0, 1, 1, 2, 3, 4);
+
+ testFailureCase(" #edges > #vertices - 1", 0, 1, 1, 2, 2, 0);
+
+ testFailureCase(">1 predecessors", 0, 1, 1, 2, 3, 4, 4, 1);
+
+ testFailureCase("unreachable", 0, 1, 1, 2, 2, 0, 3, 4);
+
+ testFailureCase("self-loop", 0, 1, 2, 3, 3, 3);
+ }
+
+ RoutingTable newRoutingTable(int... peerIndices) {
+ final RoutingTable.Builder b = RoutingTable.newBuilder();
+ for (int i = 0; i < peerIndices.length; i += 2) {
+ b.addSuccessor(peers[peerIndices[i]], peers[peerIndices[i + 1]]);
+ }
+ return b.build();
+ }
+
+ void testFailureCase(String name, int... peerIndices) {
+ Assert.assertEquals(0, peerIndices.length % 2);
+
+ testFailureCase(name + ": " + toString(peerIndices),
+ () -> newRoutingTable(peerIndices),
+ IllegalStateException.class, LOG);
+ }
+
+ String toString(int... peerIndices) {
+ Assert.assertEquals(0, peerIndices.length % 2);
+ if (peerIndices.length == 0) {
+ return "<empty>";
+ }
+ final StringBuilder b = new StringBuilder();
+ b.append(peerIndices[0]).append("->").append(peerIndices[1]);
+ for (int i = 2; i < peerIndices.length; i += 2) {
+ b.append(", ").append(peerIndices[i]).append("->").append(peerIndices[i + 1]);
+ }
+ return b.toString();
+ }
+}