You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/04 07:03:53 UTC

[incubator-ratis] branch master updated: RATIS-1280. Validate RoutingTable before building it. (#390)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 841a04c  RATIS-1280. Validate RoutingTable before building it. (#390)
841a04c is described below

commit 841a04c37416510abc87fd150d35bb511c6b7cb7
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Jan 4 15:03:46 2021 +0800

    RATIS-1280. Validate RoutingTable before building it. (#390)
---
 .../org/apache/ratis/protocol/RoutingTable.java    | 96 +++++++++++++++++++++-
 .../src/test/java/org/apache/ratis/BaseTest.java   | 16 ----
 .../filestore/FileStoreStreamingBaseTest.java      |  7 +-
 .../ratis/datastream/DataStreamBaseTest.java       |  2 +-
 .../ratis/datastream/DataStreamClusterTests.java   |  6 ++
 .../ratis/datastream/DataStreamTestUtils.java      | 23 +++++-
 .../apache/ratis/protocol/TestRoutingTable.java    | 91 ++++++++++++++++++++
 7 files changed, 216 insertions(+), 25 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
index 2083f42..d6fc1a5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,8 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 
 import java.util.Arrays;
@@ -26,20 +28,30 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
+/**
+ * A routing table is a directed acyclic graph containing exactly one primary peer such that
+ * (1) the primary peer is the only starting peer, and
+ * (2) all the other peers can be reached from the primary peer by exactly one path.
+ */
 public interface RoutingTable {
+  /** @return the successor peers of the given peer. */
   Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
 
+  /** @return the proto of this {@link RoutingTable}. */
   RoutingTableProto toProto();
 
+  /** @return a new builder to build a {@link RoutingTable}. */
   static Builder newBuilder() {
     return new Builder();
   }
 
-
+  /** To build a {@link RoutingTable}. */
   class Builder {
     private final AtomicReference<Map<RaftPeerId, Set<RaftPeerId>>> ref = new AtomicReference<>(new HashMap<>());
 
@@ -70,9 +82,87 @@ public interface RoutingTable {
           .map(RoutingTable::newRoutingTable)
           .orElseThrow(() -> new IllegalStateException("RoutingTable Already built"));
     }
+
+    static void validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
+      if (map != null && !map.isEmpty()) {
+        new Builder.Validation(map).run();
+      }
+    }
+
+    /** Validate if a map represents a valid routing table. */
+    private static final class Validation {
+      private final Map<RaftPeerId, Set<RaftPeerId>> map;
+      private final RaftPeerId primary;
+      private final Set<RaftPeerId> unreachablePeers;
+
+      private Validation(Map<RaftPeerId, Set<RaftPeerId>> map) {
+        this.map = Objects.requireNonNull(map, "map == null");
+
+        final Set<RaftPeerId> allPeers = new HashSet<>(map.keySet());
+        final Set<RaftPeerId> startingPeers = new HashSet<>(map.keySet());
+        int numEdges = 0;
+        for (Map.Entry<RaftPeerId, Set<RaftPeerId>> entry: map.entrySet()) {
+          final Set<RaftPeerId> successors = entry.getValue();
+          if (successors == null) {
+            continue;
+          }
+          for (RaftPeerId s : successors) {
+            Preconditions.assertTrue(!s.equals(entry.getKey()), () -> "Invalid routing table: the peer " + s
+                + " has a self-loop, " + this);
+
+            if (!startingPeers.remove(s)) { //the primary peer cannot be a successor
+              final boolean added = allPeers.add(s); //an ending peer may not be contained as a key in the map
+              Preconditions.assertTrue(added, () -> "Invalid routing table: the peer " + s
+                  + " has more than one predecessors, " + this);
+            }
+          }
+          numEdges += successors.size();
+        }
+
+        Preconditions.assertTrue(numEdges == allPeers.size() - 1,
+            "Invalid routing table: #edges = %d != #vertices - 1, #vertices=%d, %s",
+            numEdges, allPeers.size(), this);
+        Preconditions.assertTrue(!startingPeers.isEmpty(),
+            () -> "Invalid routing table: Starting peer not found, " + this);
+        Preconditions.assertTrue(startingPeers.size() == 1,
+            () -> "Invalid routing table: More than one starting peers: " + startingPeers + ", " + this);
+
+        this.primary = startingPeers.iterator().next();
+        this.unreachablePeers = allPeers;
+      }
+
+      private void run() {
+        depthFirstSearch(primary);
+        Preconditions.assertTrue(unreachablePeers.isEmpty() ,
+            () -> "Invalid routing table: peer(s) " + unreachablePeers +  " are unreachable, " + this);
+      }
+
+      private void depthFirstSearch(RaftPeerId current) {
+        final boolean removed = unreachablePeers.remove(current);
+        Preconditions.assertTrue(removed, () -> "Invalid routing table: the peer " + current
+            + " has more than one predecessors, " + this);
+        for (RaftPeerId successor : get(current)) {
+          depthFirstSearch(successor);
+        }
+      }
+
+      private Set<RaftPeerId> get(RaftPeerId peerId) {
+        return Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
+      }
+
+      @Override
+      public String toString() {
+        return "primary=" + primary + ", map=" + map;
+      }
+    }
   }
 
+  /** @return a new {@link RoutingTable} represented by the given map. */
   static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
+    Builder.validate(map);
+
+    final Supplier<RoutingTableProto> proto = JavaUtils.memoize(
+        () -> RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build());
     return new RoutingTable() {
       @Override
       public Set<RaftPeerId> getSuccessors(RaftPeerId peerId) {
@@ -81,7 +171,7 @@ public interface RoutingTable {
 
       @Override
       public RoutingTableProto toProto() {
-        return RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build();
+        return proto.get();
       }
     };
   }
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 058d5ea..c77b01f 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -20,7 +20,6 @@ package org.apache.ratis;
 import org.apache.log4j.Level;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -81,20 +79,6 @@ public abstract class BaseTest {
     return peersWithPriority;
   }
 
-  public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
-    RoutingTable.Builder builder = RoutingTable.newBuilder();
-    RaftPeer previous = primary;
-    for (RaftPeer peer : peers) {
-      if (peer.equals(primary)) {
-        continue;
-      }
-      builder.addSuccessor(previous.getId(), peer.getId());
-      previous = peer;
-    }
-
-    return builder.build();
-  }
-
   @After
   public void assertNoFailures() {
     final Throwable e = firstException.get();
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index bd10d28..92147de 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,6 +21,7 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RoutingTable;
@@ -73,7 +74,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
     final CheckedSupplier<FileStoreClient, IOException> newClient =
         () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
 
-    RoutingTable routingTable = getRoutingTable(peers, primary);
+    RoutingTable routingTable = DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
     testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient, routingTable);
     testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient, routingTable);
     testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient, routingTable);
@@ -95,7 +96,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
     final CheckedSupplier<FileStoreClient, IOException> newClient =
         () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
 
-    RoutingTable routingTable = getRoutingTable(peers, primary);
+    RoutingTable routingTable = DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
     testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient, routingTable);
     testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient, routingTable);
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index ac0e9c1..b3db0e0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -133,7 +133,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream(clientId)) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
-          .stream(null, getRoutingTable(peers, getPrimaryServer().getPeer()));
+          .stream(null, DataStreamTestUtils.getRoutingTableChainTopology(peers, getPrimaryServer().getPeer()));
       if (headerException != null) {
         final DataStreamReply headerReply = out.getHeaderFuture().join();
         Assert.assertFalse(headerReply.isSuccess());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index c9625a6..fd47045 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
@@ -38,6 +39,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -51,6 +53,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
 
   public static final int NUM_SERVERS = 3;
 
+  RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+    return DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
+  }
+
   @Test
   public void testStreamWrites() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 14d5fe5..b00e389 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -33,11 +33,11 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientMessage;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -46,6 +46,7 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
@@ -123,6 +124,24 @@ public interface DataStreamTestUtils {
     return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
   }
 
+  static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeer> peers, RaftPeer primary) {
+    return getRoutingTableChainTopology(CollectionUtils.as(peers, RaftPeer::getId), primary.getId());
+  }
+
+  static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeerId> peers, RaftPeerId primary) {
+    final RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeerId previous = primary;
+    for (RaftPeerId peer : peers) {
+      if (peer.equals(primary)) {
+        continue;
+      }
+      builder.addSuccessor(previous, peer);
+      previous = peer;
+    }
+
+    return builder.build();
+  }
+
   class MultiDataStreamStateMachine extends BaseStateMachine {
     private final ConcurrentMap<ClientInvocationId, SingleDataStream> streams = new ConcurrentHashMap<>();
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java
new file mode 100644
index 0000000..58bdf07
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRoutingTable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRoutingTable extends BaseTest {
+  @Override
+  public int getGlobalTimeoutSeconds() {
+    return 1;
+  }
+
+  private final RaftPeerId[] peers = new RaftPeerId[10];
+
+  {
+    for(int i = 0; i < peers.length; i++) {
+      peers[i] = RaftPeerId.valueOf("s" + i);
+    }
+  }
+
+  @Test
+  public void testRoutingTableValidation() {
+    { // empty table
+      newRoutingTable();
+    }
+
+    { // 0 -> 1 -> 2
+      newRoutingTable(0, 1, 1, 2);
+    }
+
+    { // 0 -> 1, 0 -> 2
+      newRoutingTable(0, 1, 0, 2);
+    }
+
+    testFailureCase(" #edges < #vertices - 1", 0, 1, 1, 2, 3, 4);
+
+    testFailureCase(" #edges > #vertices - 1", 0, 1, 1, 2, 2, 0);
+
+    testFailureCase(">1 predecessors", 0, 1, 1, 2, 3, 4, 4, 1);
+
+    testFailureCase("unreachable", 0, 1, 1, 2, 2, 0, 3, 4);
+
+    testFailureCase("self-loop", 0, 1, 2, 3, 3, 3);
+  }
+
+  RoutingTable newRoutingTable(int... peerIndices) {
+    final RoutingTable.Builder b = RoutingTable.newBuilder();
+    for (int i = 0; i < peerIndices.length; i += 2) {
+      b.addSuccessor(peers[peerIndices[i]], peers[peerIndices[i + 1]]);
+    }
+    return b.build();
+  }
+
+  void testFailureCase(String name, int... peerIndices) {
+    Assert.assertEquals(0, peerIndices.length % 2);
+
+    testFailureCase(name + ": " + toString(peerIndices),
+        () -> newRoutingTable(peerIndices),
+        IllegalStateException.class, LOG);
+  }
+
+  String toString(int... peerIndices) {
+    Assert.assertEquals(0, peerIndices.length % 2);
+    if (peerIndices.length == 0) {
+      return "<empty>";
+    }
+    final StringBuilder b = new StringBuilder();
+    b.append(peerIndices[0]).append("->").append(peerIndices[1]);
+    for (int i = 2; i < peerIndices.length; i += 2) {
+      b.append(", ").append(peerIndices[i]).append("->").append(peerIndices[i + 1]);
+    }
+    return b.toString();
+  }
+}