You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/05/10 09:05:45 UTC
ignite git commit: IGNITE-8422: Zookeeper discovery split brain
detection shouldn't consider client nodes. This closes #3951.
Repository: ignite
Updated Branches:
refs/heads/master 07cbe22da -> 534fcec82
IGNITE-8422: Zookeeper discovery split brain detection shouldn't consider client nodes. This closes #3951.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/534fcec8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/534fcec8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/534fcec8
Branch: refs/heads/master
Commit: 534fcec827bf7a95f100a2f1fbacb908b9d88878
Parents: 07cbe22
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Thu May 10 12:05:37 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu May 10 12:05:37 2018 +0300
----------------------------------------------------------------------
.../DefaultCommunicationFailureResolver.java | 367 +++++++++---------
.../internal/cluster/graph/BitSetIterator.java | 66 ++++
.../internal/cluster/graph/ClusterGraph.java | 207 ++++++++++
.../graph/FullyConnectedComponentSearcher.java | 341 +++++++++++++++++
.../communication/tcp/TcpCommunicationSpi.java | 82 ++--
.../FullyConnectedComponentSearcherTest.java | 323 ++++++++++++++++
.../zk/internal/ZookeeperDiscoverySpiTest.java | 379 +++++++++++++++++++
7 files changed, 1539 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
index a4c6da9..9ccadf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
@@ -18,13 +18,21 @@
package org.apache.ignite.configuration;
import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.graph.BitSetIterator;
+import org.apache.ignite.internal.cluster.graph.ClusterGraph;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Default Communication Failure Resolver.
@@ -36,266 +44,239 @@ public class DefaultCommunicationFailureResolver implements CommunicationFailure
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
- ClusterGraph graph = new ClusterGraph(log, ctx);
+ ClusterPart largestCluster = findLargestConnectedCluster(ctx);
- ClusterSearch cluster = graph.findLargestIndependentCluster();
+ if (largestCluster == null)
+ return;
- List<ClusterNode> nodes = ctx.topologySnapshot();
+ log.info("Communication problem resolver found fully connected independent cluster ["
+ + "serverNodesCnt=" + largestCluster.srvNodesCnt + ", "
+ + "clientNodesCnt=" + largestCluster.connectedClients.size() + ", "
+ + "totalAliveNodes=" + ctx.topologySnapshot().size() + ", "
+ + "serverNodesIds=" + clusterNodeIds(largestCluster.srvNodesSet, ctx.topologySnapshot(), 1000) + "]");
- assert nodes.size() > 0;
- assert cluster != null;
-
- if (graph.checkFullyConnected(cluster.nodesBitSet)) {
- assert cluster.nodeCnt <= nodes.size();
-
- if (cluster.nodeCnt < nodes.size()) {
- if (log.isInfoEnabled()) {
- log.info("Communication problem resolver found fully connected independent cluster [" +
- "clusterSrvCnt=" + cluster.srvCnt +
- ", clusterTotalNodes=" + cluster.nodeCnt +
- ", totalAliveNodes=" + nodes.size() + "]");
- }
-
- for (int i = 0; i < nodes.size(); i++) {
- if (!cluster.nodesBitSet.get(i))
- ctx.killNode(nodes.get(i));
- }
- }
- else
- U.warn(log, "All alive nodes are fully connected, this should be resolved automatically.");
- }
- else {
- if (log.isInfoEnabled()) {
- log.info("Communication problem resolver failed to find fully connected independent cluster.");
- }
- }
+ keepCluster(ctx, largestCluster);
}
/**
- * @param cluster Cluster nodes mask.
- * @param nodes Nodes.
- * @param limit IDs limit.
- * @return Cluster node IDs string.
+ * Finds largest part of the cluster where each node is able to connect to each other.
+ *
+ * @param ctx Communication failure context.
+ * @return Largest part of the cluster nodes to keep.
*/
- private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) {
- int startIdx = 0;
+ @Nullable private ClusterPart findLargestConnectedCluster(CommunicationFailureContext ctx) {
+ List<ClusterNode> srvNodes = ctx.topologySnapshot()
+ .stream()
+ .filter(node -> !CU.clientNode(node))
+ .collect(Collectors.toList());
- StringBuilder builder = new StringBuilder();
+ // Exclude client nodes from analysis.
+ ClusterGraph graph = new ClusterGraph(ctx, CU::clientNode);
- int cnt = 0;
+ List<BitSet> components = graph.findConnectedComponents();
- for (;;) {
- int idx = cluster.nextSetBit(startIdx);
+ if (components.isEmpty()) {
+ U.warn(log, "Unable to find at least one alive server node in the cluster " + ctx);
- if (idx == -1)
- break;
+ return null;
+ }
- startIdx = idx + 1;
+ if (components.size() == 1) {
+ BitSet nodesSet = components.get(0);
+ int nodeCnt = nodesSet.cardinality();
- if (builder.length() == 0) {
- builder.append('[');
+ boolean fullyConnected = graph.checkFullyConnected(nodesSet);
+
+ if (fullyConnected && nodeCnt == srvNodes.size()) {
+ U.warn(log, "All alive nodes are fully connected, this should be resolved automatically.");
+
+ return null;
}
- else
- builder.append(", ");
- builder.append(nodes.get(idx).id());
+ if (log.isInfoEnabled())
+ log.info("Communication problem resolver detected partial lost for some connections inside cluster. "
+ + "Will keep largest set of healthy fully-connected nodes. Other nodes will be killed forcibly.");
- if (cnt++ > limit)
- builder.append(", ...");
+ BitSet fullyConnectedPart = graph.findLargestFullyConnectedComponent(nodesSet);
+ Set<ClusterNode> connectedClients = findConnectedClients(ctx, fullyConnectedPart);
+
+ return new ClusterPart(fullyConnectedPart, connectedClients);
}
- builder.append(']');
+ // If cluster has splitted on several parts and there are at least 2 parts which aren't single node
+ // It means that split brain has happened.
+ boolean isSplitBrain = components.size() > 1 &&
+ components.stream().filter(cmp -> cmp.size() > 1).count() > 1;
- return builder.toString();
- }
+ if (isSplitBrain)
+ U.warn(log, "Communication problem resolver detected split brain. "
+ + "Cluster has splitted on " + components.size() + " independent parts. "
+ + "Will keep only one largest fully-connected part. "
+ + "Other nodes will be killed forcibly.");
+ else
+ U.warn(log, "Communication problem resolver detected full lost for some connections inside cluster. "
+ + "Problem nodes will be found and killed forcibly.");
- /**
- *
- */
- private static class ClusterSearch {
- /** */
- int srvCnt;
+ // For each part of splitted cluster extract largest fully-connected component.
+ ClusterPart largestCluster = null;
+ for (int i = 0; i < components.size(); i++) {
+ BitSet clusterPart = components.get(i);
- /** */
- int nodeCnt;
+ BitSet fullyConnectedPart = graph.findLargestFullyConnectedComponent(clusterPart);
+ Set<ClusterNode> connectedClients = findConnectedClients(ctx, fullyConnectedPart);
- /** */
- final BitSet nodesBitSet;
+ ClusterPart curr = new ClusterPart(fullyConnectedPart, connectedClients);
- /**
- * @param nodes Total nodes.
- */
- ClusterSearch(int nodes) {
- nodesBitSet = new BitSet(nodes);
+ if (largestCluster == null || curr.compareTo(largestCluster) > 0)
+ largestCluster = curr;
}
+
+ assert largestCluster != null
+ : "Unable to find at least one alive independent cluster.";
+
+ return largestCluster;
}
/**
+ * Keeps server cluster nodes presented in given {@code srvNodesSet}.
+ * Client nodes which have connections to presented {@code srvNodesSet} will be also keeped.
+ * Other nodes will be killed forcibly.
*
+ * @param ctx Communication failure context.
+ * @param clusterPart Set of nodes need to keep in the cluster.
*/
- private static class ClusterGraph {
- /** */
- private final static int WORD_IDX_SHIFT = 6;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final int nodeCnt;
-
- /** */
- private final long[] visitBitSet;
-
- /** */
- private final CommunicationFailureContext ctx;
-
- /** */
- private final List<ClusterNode> nodes;
+ private void keepCluster(CommunicationFailureContext ctx, ClusterPart clusterPart) {
+ List<ClusterNode> allNodes = ctx.topologySnapshot();
- /**
- * @param log Logger.
- * @param ctx Context.
- */
- ClusterGraph(IgniteLogger log, CommunicationFailureContext ctx) {
- this.log = log;
- this.ctx = ctx;
+ // Kill server nodes.
+ for (int idx = 0; idx < allNodes.size(); idx++) {
+ ClusterNode node = allNodes.get(idx);
- nodes = ctx.topologySnapshot();
+ // Client nodes will be processed separately.
+ if (CU.clientNode(node))
+ continue;
- nodeCnt = nodes.size();
+ if (!clusterPart.srvNodesSet.get(idx))
+ ctx.killNode(node);
+ }
- assert nodeCnt > 0;
+ // Kill client nodes unable to connect to the presented part of cluster.
+ for (int idx = 0; idx < allNodes.size(); idx++) {
+ ClusterNode node = allNodes.get(idx);
- visitBitSet = initBitSet(nodeCnt);
+ if (CU.clientNode(node) && !clusterPart.connectedClients.contains(node))
+ ctx.killNode(node);
}
+ }
- /**
- * @param bitIndex Bit index.
- * @return Word index containing bit with given index.
- */
- private static int wordIndex(int bitIndex) {
- return bitIndex >> WORD_IDX_SHIFT;
- }
+ /**
+ * Finds set of the client nodes which are able to connect to given set of server nodes {@code srvNodesSet}.
+ *
+ * @param ctx Communication failure context.
+ * @param srvNodesSet Server nodes set.
+ * @return Set of client nodes.
+ */
+ private Set<ClusterNode> findConnectedClients(CommunicationFailureContext ctx, BitSet srvNodesSet) {
+ Set<ClusterNode> connectedClients = new HashSet<>();
- /**
- * @param bitCnt Number of bits.
- * @return Bit set words.
- */
- static long[] initBitSet(int bitCnt) {
- return new long[wordIndex(bitCnt - 1) + 1];
- }
+ List<ClusterNode> allNodes = ctx.topologySnapshot();
- /**
- * @return Cluster nodes bit set.
- */
- ClusterSearch findLargestIndependentCluster() {
- ClusterSearch maxCluster = null;
+ for (ClusterNode node : allNodes) {
+ if (!CU.clientNode(node))
+ continue;
- for (int i = 0; i < nodeCnt; i++) {
- if (getBit(visitBitSet, i))
- continue;
+ boolean hasConnections = true;
- ClusterSearch cluster = new ClusterSearch(nodeCnt);
+ Iterator<Integer> it = new BitSetIterator(srvNodesSet);
+ while (it.hasNext()) {
+ int srvNodeIdx = it.next();
+ ClusterNode srvNode = allNodes.get(srvNodeIdx);
- search(cluster, i);
+ if (!ctx.connectionAvailable(node, srvNode) || !ctx.connectionAvailable(srvNode, node)) {
+ hasConnections = false;
- if (log.isInfoEnabled()) {
- log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt +
- ", totalNodeCnt=" + cluster.nodeCnt +
- ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) + "]");
+ break;
}
-
- if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt)
- maxCluster = cluster;
}
- return maxCluster;
+ if (hasConnections)
+ connectedClients.add(node);
}
- /**
- * @param cluster Cluster nodes bit set.
- * @return {@code True} if all cluster nodes are able to connect to each other.
- */
- boolean checkFullyConnected(BitSet cluster) {
- int startIdx = 0;
-
- int clusterNodes = cluster.cardinality();
-
- for (;;) {
- int idx = cluster.nextSetBit(startIdx);
+ return connectedClients;
+ }
- if (idx == -1)
- break;
+ /**
+ * Class representing part of cluster.
+ */
+ private static class ClusterPart implements Comparable<ClusterPart> {
+ /** Server nodes count. */
+ int srvNodesCnt;
- ClusterNode node1 = nodes.get(idx);
+ /** Server nodes set. */
+ BitSet srvNodesSet;
- for (int i = 0; i < clusterNodes; i++) {
- if (!cluster.get(i) || i == idx)
- continue;
+ /** Set of client nodes are able to connect to presented part of server nodes. */
+ Set<ClusterNode> connectedClients;
- ClusterNode node2 = nodes.get(i);
+ /**
+ * Constructor.
+ *
+ * @param srvNodesSet Server nodes set.
+ * @param connectedClients Set of client nodes.
+ */
+ public ClusterPart(BitSet srvNodesSet, Set<ClusterNode> connectedClients) {
+ this.srvNodesSet = srvNodesSet;
+ this.srvNodesCnt = srvNodesSet.cardinality();
+ this.connectedClients = connectedClients;
+ }
- if (cluster.get(i) && !ctx.connectionAvailable(node1, node2))
- return false;
- }
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull ClusterPart o) {
+ int srvNodesCmp = Integer.compare(srvNodesCnt, o.srvNodesCnt);
- startIdx = idx + 1;
- }
+ if (srvNodesCmp != 0)
+ return srvNodesCmp;
- return true;
+ return Integer.compare(connectedClients.size(), o.connectedClients.size());
}
+ }
- /**
- * @param cluster Current cluster bit set.
- * @param idx Node index.
- */
- void search(ClusterSearch cluster, int idx) {
- assert !getBit(visitBitSet, idx);
-
- setBit(visitBitSet, idx);
-
- cluster.nodesBitSet.set(idx);
- cluster.nodeCnt++;
+ /**
+ * @param cluster Cluster nodes mask.
+ * @param nodes Nodes.
+ * @param limit IDs limit.
+ * @return Cluster node IDs string.
+ */
+ private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) {
+ int startIdx = 0;
- ClusterNode node1 = nodes.get(idx);
+ StringBuilder builder = new StringBuilder();
- if (!CU.clientNode(node1))
- cluster.srvCnt++;
+ int cnt = 0;
- for (int i = 0; i < nodeCnt; i++) {
- if (i == idx || getBit(visitBitSet, i))
- continue;
+ for (;;) {
+ int idx = cluster.nextSetBit(startIdx);
- ClusterNode node2 = nodes.get(i);
+ if (idx == -1)
+ break;
- boolean connected = ctx.connectionAvailable(node1, node2) ||
- ctx.connectionAvailable(node2, node1);
+ startIdx = idx + 1;
- if (connected)
- search(cluster, i);
- }
- }
+ if (builder.length() == 0)
+ builder.append('[');
+ else
+ builder.append(", ");
- /**
- * @param words Bit set words.
- * @param bitIndex Bit index.
- */
- static void setBit(long words[], int bitIndex) {
- int wordIndex = wordIndex(bitIndex);
+ builder.append(nodes.get(idx).id());
- words[wordIndex] |= (1L << bitIndex);
+ if (cnt++ > limit)
+ builder.append(", ...");
}
- /**
- * @param words Bit set words.
- * @param bitIndex Bit index.
- * @return Bit value.
- */
- static boolean getBit(long[] words, int bitIndex) {
- int wordIndex = wordIndex(bitIndex);
+ builder.append(']');
- return (words[wordIndex] & (1L << bitIndex)) != 0;
- }
+ return builder.toString();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/BitSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/BitSetIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/BitSetIterator.java
new file mode 100644
index 0000000..3a5cf9f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/BitSetIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.graph;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator over set bits in {@link BitSet}.
+ */
+public class BitSetIterator implements Iterator<Integer> {
+ /** Bitset. */
+ private final BitSet bitSet;
+
+ /** Current index. */
+ private int idx = -1;
+
+ /**
+ * @param bitSet Bitset.
+ */
+ public BitSetIterator(BitSet bitSet) {
+ this.bitSet = bitSet;
+
+ advance();
+ }
+
+ /**
+ * Find index of the next set bit.
+ */
+ private void advance() {
+ idx = bitSet.nextSetBit(idx + 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return idx != -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer next() throws NoSuchElementException {
+ if (idx == -1)
+ throw new NoSuchElementException();
+
+ int res = idx;
+
+ advance();
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/ClusterGraph.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/ClusterGraph.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/ClusterGraph.java
new file mode 100644
index 0000000..ba56c33
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/ClusterGraph.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.graph;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CommunicationFailureContext;
+
+/**
+ * Class to represent cluster nodes avalaible connections as graph.
+ * Provides several graph algorithms to analyze cluster nodes connections.
+ */
+public class ClusterGraph {
+ /** Number of all cluster nodes. */
+ private final int nodeCnt;
+
+ /** List of the all cluster nodes. */
+ private final List<ClusterNode> nodes;
+
+ /** Connectivity (adjacency) matrix between cluster nodes. */
+ private final BitSet[] connections;
+
+ /** Fully-connected component searcher. */
+ private final FullyConnectedComponentSearcher fccSearcher;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Communication failure context.
+ * @param nodeFilterOut Filter to exclude some cluster nodes from graph.
+ */
+ public ClusterGraph(CommunicationFailureContext ctx, Predicate<ClusterNode> nodeFilterOut) {
+ nodes = ctx.topologySnapshot();
+
+ nodeCnt = nodes.size();
+
+ assert nodeCnt > 0;
+
+ connections = buildConnectivityMatrix(ctx, nodeFilterOut);
+
+ fccSearcher = new FullyConnectedComponentSearcher(connections);
+ }
+
+ /**
+ * Builds connectivity matrix (adjacency matrix) for all cluster nodes.
+ *
+ * @param ctx Communication failure context.
+ * @param nodeFilterOut Filter to exclude some cluster nodes from graph.
+ * @return Connections bit set for each node, where set bit means avalable connection.
+ */
+ private BitSet[] buildConnectivityMatrix(CommunicationFailureContext ctx, Predicate<ClusterNode> nodeFilterOut) {
+ BitSet[] connections = new BitSet[nodeCnt];
+
+ for (int i = 0; i < nodeCnt; i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (nodeFilterOut.test(node)) {
+ connections[i] = null;
+ continue;
+ }
+
+ connections[i] = new BitSet(nodeCnt);
+ for (int j = 0; j < nodeCnt; j++) {
+ ClusterNode to = nodes.get(j);
+
+ if (nodeFilterOut.test(to))
+ continue;
+
+ if (i == j || ctx.connectionAvailable(node, to))
+ connections[i].set(j);
+ }
+ }
+
+ // Remove unidirectional connections (node A can connect to B, but B can't connect to A).
+ for (int i = 0; i < nodeCnt; i++)
+ for (int j = i + 1; j < nodeCnt; j++) {
+ if (connections[i] == null || connections[j] == null)
+ continue;
+
+ if (connections[i].get(j) ^ connections[j].get(i)) {
+ connections[i].set(j, false);
+ connections[j].set(i, false);
+ }
+ }
+
+ return connections;
+ }
+
+ /**
+ * Finds connected components in cluster graph.
+ *
+ * @return List of set of nodes, each set represents connected component.
+ */
+ public List<BitSet> findConnectedComponents() {
+ List<BitSet> connectedComponets = new ArrayList<>();
+
+ BitSet visitSet = new BitSet(nodeCnt);
+
+ for (int i = 0; i < nodeCnt; i++) {
+ if (visitSet.get(i) || connections[i] == null)
+ continue;
+
+ BitSet currComponent = new BitSet(nodeCnt);
+
+ dfs(i, currComponent, visitSet);
+
+ connectedComponets.add(currComponent);
+ }
+
+ return connectedComponets;
+ }
+
+ /**
+ * Deep-first search to find connected components in connections graph.
+ *
+ * @param nodeIdx Current node index to traverse from.
+ * @param currComponent Current connected component to populate.
+ * @param allVisitSet Set of the visited nodes in whole graph during traversal.
+ */
+ private void dfs(int nodeIdx, BitSet currComponent, BitSet allVisitSet) {
+ assert !allVisitSet.get(nodeIdx)
+ : "Incorrect node visit " + nodeIdx;
+
+ assert connections[nodeIdx] != null
+ : "Incorrect node visit. Node has not passed filter " + nodes.get(nodeIdx);
+
+ allVisitSet.set(nodeIdx);
+
+ currComponent.set(nodeIdx);
+
+ for (int toIdx = 0; toIdx < nodeCnt; toIdx++) {
+ if (toIdx == nodeIdx || allVisitSet.get(toIdx) || connections[toIdx] == null)
+ continue;
+
+ boolean connected = connections[nodeIdx].get(toIdx) && connections[toIdx].get(nodeIdx);
+
+ if (connected)
+ dfs(toIdx, currComponent, allVisitSet);
+ }
+ }
+
+ /**
+ * Finds largest fully-connected component from given {@code nodesSet}.
+ *
+ * @param nodesSet Set of nodes.
+ * @return Set of nodes which forms largest fully-connected component.
+ */
+ public BitSet findLargestFullyConnectedComponent(BitSet nodesSet) {
+ // Check that current set is already fully connected.
+ boolean fullyConnected = checkFullyConnected(nodesSet);
+
+ if (fullyConnected)
+ return nodesSet;
+
+ BitSet res = fccSearcher.findLargest(nodesSet);
+
+ assert checkFullyConnected(res)
+ : "Not fully connected component was found [result=" + res + ", nodesSet=" + nodesSet + "]";
+
+ return res;
+ }
+
+ /**
+ * Checks that given {@code nodesSet} forms fully-connected component.
+ *
+ * @param nodesSet Set of cluster nodes.
+ * @return {@code True} if all given cluster nodes are able to connect to each other.
+ */
+ public boolean checkFullyConnected(BitSet nodesSet) {
+ int maxIdx = nodesSet.length();
+
+ Iterator<Integer> it = new BitSetIterator(nodesSet);
+
+ while (it.hasNext()) {
+ int idx = it.next();
+
+ for (int i = 0; i < maxIdx; i++) {
+ if (i == idx)
+ continue;
+
+ if (nodesSet.get(i) && !connections[idx].get(i))
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/FullyConnectedComponentSearcher.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/FullyConnectedComponentSearcher.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/FullyConnectedComponentSearcher.java
new file mode 100644
index 0000000..9a8098e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/graph/FullyConnectedComponentSearcher.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.graph;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.Iterator;
+
+/**
+ * Class to find (possibly) largest fully-connected component (also can be called as <b>complete subgraph</b>) in graph.
+ * This problem is also known as <b>Clique problem</b> which is NP-complete.
+ *
+ * For small number of nodes simple brute-force algorithm is used which finds such component guaranteed.
+ * For large number of nodes some sort of greedy heuristic is used which works well for real-life scenarios
+ * but doesn't guarantee to find largest component, however very close to ideal result.
+ */
+public class FullyConnectedComponentSearcher {
+ /** The maximal number of nodes when bruteforce algorithm will be used. */
+ private static final int BRUTE_FORCE_THRESHOULD = 24;
+
+ /** Number of nodes in connections graph. */
+ private final int totalNodesCnt;
+
+ /** Adjacency matrix. */
+ private final BitSet[] connections;
+
+ /**
+ * Constructor.
+ *
+ * @param connections Adjacency matrix.
+ */
+ public FullyConnectedComponentSearcher(BitSet[] connections) {
+ this.connections = connections;
+ totalNodesCnt = connections.length;
+ }
+
+ /**
+ * Find largest fully connected component from presented set of the nodes {@code where}.
+ *
+ * @param where Set of nodes where fully connected component must be found.
+ * @return Set of nodes forming fully connected component.
+ */
+ public BitSet findLargest(BitSet where) {
+ int nodesCnt = where.cardinality();
+
+ if (nodesCnt <= BRUTE_FORCE_THRESHOULD)
+ return bruteforce(nodesCnt, where);
+
+ // Return best of the 2 heuristics.
+ BitSet e1 = heuristic1(where);
+ BitSet e2 = heuristic2(where);
+
+ return e1.cardinality() > e2.cardinality() ? e1 : e2;
+ }
+
+ /**
+ * Extract node indexes (set bits) from given {@code selectedSet} to integer array.
+ *
+ * @param selectedNodesCnt Number of nodes.
+ * @param selectedSet Set of nodes.
+ * @return Arrays which contains node indexes.
+ */
+ private Integer[] extractNodeIndexes(int selectedNodesCnt, BitSet selectedSet) {
+ Integer[] indexes = new Integer[selectedNodesCnt];
+ Iterator<Integer> it = new BitSetIterator(selectedSet);
+ int i = 0;
+
+ while (it.hasNext())
+ indexes[i++] = it.next();
+
+ assert i == indexes.length
+ : "Extracted not all indexes [nodesCnt=" + selectedNodesCnt + ", extracted=" + i + ", set=" + selectedSet + "]";
+
+ return indexes;
+ }
+
+ /**
+ * Sorts nodes using {@link ConnectionsComparator}
+ * and runs greedy algorithm {@link #greedyIterative(int, Integer[])} on it.
+ *
+ * @param selectedSet Set of nodes used to form fully-connected component.
+ * @return Subset of given {@code selectedSet} which forms fully connected component.
+ */
+ private BitSet heuristic1(BitSet selectedSet) {
+ int selectedNodesCnt = selectedSet.cardinality();
+ Integer[] nodeIndexes = extractNodeIndexes(selectedNodesCnt, selectedSet);
+
+ Arrays.sort(nodeIndexes, new ConnectionsComparator(totalNodesCnt));
+
+ return greedyIterative(selectedNodesCnt, nodeIndexes);
+ }
+
+ /**
+ * Exactly the same thing as in {@link #heuristic1(BitSet)} but using reversed {@link ConnectionsComparator}.
+ *
+ * @param selectedSet Set of nodes used to form fully-connected component.
+ * @return Subset of given {@code selectedSet} which forms fully connected component.
+ */
+ private BitSet heuristic2(BitSet selectedSet) {
+ int selectedNodesCnt = selectedSet.cardinality();
+ Integer[] nodeIndexes = extractNodeIndexes(selectedNodesCnt, selectedSet);
+
+ Arrays.sort(nodeIndexes, new ConnectionsComparator(totalNodesCnt).reversed());
+
+ return greedyIterative(selectedNodesCnt, nodeIndexes);
+ }
+
+ /**
+ * Finds fully-connected component between given {@code nodeIndexes} and tries to maximize size of it.
+ *
+ * The main idea of the algorithm is that after specific sorting,
+ * nodes able to form fully-connected will be placed closer to each other in given {@code nodeIndexes} array.
+ * While nodes not able to form will be placed further.
+ *
+ * At the begging of algorithm we form global set of nodes can be used to form fully-connected component.
+ * We iterate over this set and try to add each node to current fully-connected component, which is empty at the beginning.
+ *
+ * When we add node to the component we need to check that after adding new component is also fully-connected.
+ * See {@link #joinNode(BitSet, int, Integer[])}.
+ *
+ * After end of iteration we exclude nodes which formed fully-connected from the global set and run iteration again and again
+ * on remaining nodes, while the global set will not be empty.
+ *
+ * Complexity is O(N^2), where N is number of nodes.
+ *
+ * @param selectedNodesCnt Number of nodes.
+ * @param nodeIndexes Node indexes used to form fully-connected component.
+ * @return Subset of given {@code nodeIndexes} which forms fully connected component.
+ */
+ private BitSet greedyIterative(int selectedNodesCnt, Integer[] nodeIndexes) {
+ // Set of the nodes which can be used to form fully connected component.
+ BitSet canUse = new BitSet(selectedNodesCnt);
+ for (int i = 0; i < selectedNodesCnt; i++)
+ canUse.set(i);
+
+ BitSet bestRes = null;
+
+ while (!canUse.isEmpty()) {
+ // Even if we pick all possible nodes, their size will not be greater than current best result.
+ // No needs to run next iteration in this case.
+ if (bestRes != null && canUse.cardinality() <= bestRes.cardinality())
+ break;
+
+ BitSet currRes = new BitSet(selectedNodesCnt);
+
+ Iterator<Integer> canUseIter = new BitSetIterator(canUse);
+ while (canUseIter.hasNext()) {
+ /* Try to add node to the current set that forms fully connected component.
+ Node will be skipped if after adding, current set loose fully connectivity. */
+ int pickedIdx = canUseIter.next();
+
+ if (joinNode(currRes, pickedIdx, nodeIndexes)) {
+ currRes.set(pickedIdx);
+ canUse.set(pickedIdx, false);
+ }
+ }
+
+ if (bestRes == null || currRes.cardinality() > bestRes.cardinality())
+ bestRes = currRes;
+ }
+
+ // Try to improve our best result, if it was formed on second or next iteration.
+ for (int nodeIdx = 0; nodeIdx < selectedNodesCnt; nodeIdx++)
+ if (!bestRes.get(nodeIdx) && joinNode(bestRes, nodeIdx, nodeIndexes))
+ bestRes.set(nodeIdx);
+
+ // Replace relative node indexes (used in indexes) to absolute node indexes (used in whole graph connections).
+ BitSet reindexedBestRes = new BitSet(totalNodesCnt);
+ Iterator<Integer> it = new BitSetIterator(bestRes);
+ while (it.hasNext())
+ reindexedBestRes.set(nodeIndexes[it.next()]);
+
+ return reindexedBestRes;
+ }
+
+ /**
+ * Checks that given {@code nodeIdx} can be joined to current fully-connected component,
+ * so after join result component will be also fully-connected.
+ *
+ * @param currComponent Current fully-connected component.
+ * @param nodeIdx Node relative index.
+ * @param nodeIndexes Node absolute indexes.
+ * @return {@code True} if given node can be joined to {@code currentComponent}.
+ */
+ private boolean joinNode(BitSet currComponent, int nodeIdx, Integer[] nodeIndexes) {
+ boolean fullyConnected = true;
+
+ Iterator<Integer> alreadyUsedIter = new BitSetIterator(currComponent);
+ while (alreadyUsedIter.hasNext()) {
+ int existedIdx = alreadyUsedIter.next();
+
+ // If no connection between existing node and picked node, skip picked node.
+ if (!connections[nodeIndexes[nodeIdx]].get(nodeIndexes[existedIdx])) {
+ fullyConnected = false;
+
+ break;
+ }
+ }
+
+ return fullyConnected;
+ }
+
+ /**
+ * Simple bruteforce implementation which works in O(2^N * N^2), where N is number of nodes.
+ *
+ * @param selectedNodesCnt Nodes count.
+ * @param selectedSet Set of nodes.
+ * @return Subset of given {@code set} of nodes which forms fully connected component.
+ */
+ private BitSet bruteforce(int selectedNodesCnt, BitSet selectedSet) {
+ Integer[] indexes = extractNodeIndexes(selectedNodesCnt, selectedSet);
+
+ int resMask = -1;
+ int maxCardinality = -1;
+
+ // Iterate over all possible combinations of used nodes.
+ for (int mask = (1 << selectedNodesCnt) - 1; mask > 0; mask--) {
+ int cardinality = Integer.bitCount(mask);
+
+ if (cardinality <= maxCardinality)
+ continue;
+
+ // Check that selected set of nodes forms fully connected component.
+ boolean fullyConnected = true;
+
+ for (int i = 0; i < selectedNodesCnt && fullyConnected; i++)
+ if ((mask & (1 << i)) != 0)
+ for (int j = 0; j < selectedNodesCnt; j++)
+ if ((mask & (1 << j)) != 0) {
+ boolean connected = connections[indexes[i]].get(indexes[j]);
+
+ if (!connected) {
+ fullyConnected = false;
+
+ break;
+ }
+ }
+
+ if (fullyConnected) {
+ resMask = mask;
+ maxCardinality = cardinality;
+ }
+ }
+
+ BitSet resSet = new BitSet(selectedNodesCnt);
+
+ for (int i = 0; i < selectedNodesCnt; i++) {
+ if ((resMask & (1 << i)) != 0) {
+ int idx = indexes[i];
+
+ assert selectedSet.get(idx)
+ : "Result contains node which is not presented in income set [nodeIdx" + idx + ", set=" + selectedSet + "]";
+
+ resSet.set(idx);
+ }
+ }
+
+ assert resSet.cardinality() > 0
+ : "No nodes selected as fully connected component [set=" + selectedSet + "]";
+
+ return resSet;
+ }
+
+ /**
+ * Comparator which sorts nodes by their connections array.
+ *
+ * Suppose you have connections matrix:
+ *
+ * 1111
+ * 1101
+ * 1010
+ * 1101
+ *
+ * Each connection row associated with some node.
+ * Comparator will sort node indexes using their connection rows as very big binary numbers, as in example:
+ *
+ * 1111
+ * 1101
+ * 1101
+ * 1011
+ *
+ * Note: Comparator sorts only node indexes, actual connection rows swapping will be not happened.
+ */
+ private class ConnectionsComparator implements Comparator<Integer> {
+ /** Cache each connection long array representation, to avoid creating new object for each comparison. */
+ private final long[][] cachedConnRows;
+
+ /**
+ * Constructor
+ * @param totalNodesCnt Total number of nodes in the cluster.
+ */
+ ConnectionsComparator(int totalNodesCnt) {
+ cachedConnRows = new long[totalNodesCnt][];
+ }
+
+ /**
+ * Returns long array representation of connection row for given node {@code idx}.
+ *
+ * @param idx Node index.
+ * @return Long array connection row representation.
+ */
+ private long[] connectionRow(int idx) {
+ if (cachedConnRows[idx] != null)
+ return cachedConnRows[idx];
+
+ return cachedConnRows[idx] = connections[idx].toLongArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(Integer node1, Integer node2) {
+ long[] conn1 = connectionRow(node1);
+ long[] conn2 = connectionRow(node2);
+
+ int len = Math.min(conn1.length, conn2.length);
+ for (int i = 0; i < len; i++) {
+ int res = Long.compare(conn1[i], conn2[i]);
+
+ if (res != 0)
+ return res;
+ }
+
+ return conn1.length - conn2.length;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index df37dff..f9fd6fd 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3442,50 +3442,66 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
break;
}
- if (client == null) {
- assert errs != null;
+ if (client == null)
+ processClientCreationError(node, addrs, errs);
- if (X.hasCause(errs, ConnectException.class))
- LT.warn(log, "Failed to connect to a remote node " +
- "(make sure that destination node is alive and " +
- "operating system firewall is disabled on local and remote hosts) " +
- "[addrs=" + addrs + ']');
+ return client;
+ }
+
+ /**
+ * Process errors if TCP client to remote node hasn't been created.
+ *
+ * @param node Remote node.
+ * @param addrs Remote node addresses.
+ * @param errs TCP client creation errors.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void processClientCreationError(
+ ClusterNode node,
+ Collection<InetSocketAddress> addrs,
+ IgniteCheckedException errs
+ ) throws IgniteCheckedException {
+ assert errs != null;
- boolean commErrResolve = false;
+ if (X.hasCause(errs, ConnectException.class))
+ LT.warn(log, "Failed to connect to a remote node " +
+ "(make sure that destination node is alive and " +
+ "operating system firewall is disabled on local and remote hosts) " +
+ "[addrs=" + addrs + ']');
- IgniteSpiContext ctx = getSpiContext();
+ boolean commErrResolve = false;
- if (connectionError(errs) && ctx.communicationFailureResolveSupported()) {
- commErrResolve = true;
+ IgniteSpiContext ctx = getSpiContext();
- ctx.resolveCommunicationFailure(node, errs);
- }
+ if (connectionError(errs) && ctx.communicationFailureResolveSupported()) {
+ commErrResolve = true;
- if (!commErrResolve && enableForcibleNodeKill) {
- if (ctx.node(node.id()) != null
- && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
- connectionError(errs)) {
- String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
- "cluster [" + "rmtNode=" + node + ']';
+ ctx.resolveCommunicationFailure(node, errs);
+ }
- if (enableTroubleshootingLog)
- U.error(log, msg, errs);
- else
- U.warn(log, msg);
+ if (!commErrResolve && enableForcibleNodeKill) {
+ if (ctx.node(node.id()) != null
+ && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+ connectionError(errs)) {
+ String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+ "cluster [" + "rmtNode=" + node + ']';
- ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
- "rmtNode=" + node +
- ", errs=" + errs +
- ", connectErrs=" + X.getSuppressedList(errs) + ']');
- }
- }
+ if (enableTroubleshootingLog)
+ U.error(log, msg, errs);
+ else
+ U.warn(log, msg);
- if (!X.hasCause(errs, SocketTimeoutException.class, HandshakeTimeoutException.class,
- IgniteSpiOperationTimeoutException.class))
- throw errs;
+ ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+ "rmtNode=" + node +
+ ", errs=" + errs +
+ ", connectErrs=" + X.getSuppressedList(errs) + ']');
+ }
}
- return client;
+ if (!X.hasCause(errs, SocketTimeoutException.class, HandshakeTimeoutException.class,
+ IgniteSpiOperationTimeoutException.class))
+ throw errs;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/core/src/test/java/org/apache/ignite/internal/cluster/FullyConnectedComponentSearcherTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/cluster/FullyConnectedComponentSearcherTest.java b/modules/core/src/test/java/org/apache/ignite/internal/cluster/FullyConnectedComponentSearcherTest.java
new file mode 100644
index 0000000..d6680cf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/cluster/FullyConnectedComponentSearcherTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Random;
+import org.apache.ignite.internal.cluster.graph.FullyConnectedComponentSearcher;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Class to test correctness of fully-connectet component searching algorithm.
+ */
+@RunWith(Parameterized.class)
+public class FullyConnectedComponentSearcherTest {
+ /** Adjacency matrix provider for each test. */
+ private AdjacencyMatrixProvider provider;
+
+ /** Minimul acceptable result of size of fully-connected component for each test. */
+ private int minAcceptableRes;
+
+ /**
+ * @param provider Adjacency matrix.
+ * @param minAcceptableRes Expected result.
+ */
+ public FullyConnectedComponentSearcherTest(AdjacencyMatrixProvider provider, int minAcceptableRes) {
+ this.provider = provider;
+ this.minAcceptableRes = minAcceptableRes;
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testFind() {
+ BitSet[] matrix = provider.provide();
+
+ int nodes = matrix.length;
+
+ BitSet all = new BitSet(nodes);
+ for (int i = 0; i < nodes; i++)
+ all.set(i);
+
+ FullyConnectedComponentSearcher searcher = new FullyConnectedComponentSearcher(matrix);
+
+ BitSet res = searcher.findLargest(all);
+ int size = res.cardinality();
+
+ Assert.assertTrue("Actual = " + size + ", Expected = " + minAcceptableRes,
+ size >= minAcceptableRes);
+ }
+
+ /**
+ * @return Test dataset.
+ */
+ @Parameterized.Parameters(name = "{index}: search({0}) >= {1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {new StaticMatrix(new String[] {
+ "100",
+ "010",
+ "001",
+ }), 1},
+ {new StaticMatrix(new String[] {
+ "101",
+ "010",
+ "101",
+ }), 2},
+ {new StaticMatrix(new String[] {
+ "1101",
+ "1111",
+ "0110",
+ "1101",
+ }), 3},
+ {new StaticMatrix(new String[] {
+ "1111001",
+ "1111000",
+ "1111000",
+ "1111000",
+ "0000111",
+ "0000111",
+ "1000111",
+ }), 4},
+ {new AlmostSplittedMatrix(30, 100, 200), 200},
+ {new AlmostSplittedMatrix(500, 1000, 2000), 2000},
+ {new AlmostSplittedMatrix(1000, 2000, 3000), 3000},
+ {new AlmostSplittedMatrix(30, 22, 25, 33, 27), 33},
+ {new AlmostSplittedMatrix(1000, 400, 1000, 800), 1000},
+ {new SeveralConnectionsAreLostMatrix(200, 10), 190},
+ {new SeveralConnectionsAreLostMatrix(2000, 100), 1900},
+ {new SeveralConnectionsAreLostMatrix(2000, 500), 1500},
+ {new SeveralConnectionsAreLostMatrix(4000, 2000), 2000}
+ });
+ }
+
+ /**
+ * Provider for adjacency matrix for each test.
+ */
+ interface AdjacencyMatrixProvider {
+ /**
+ * @return Adjacency matrix.
+ */
+ BitSet[] provide();
+ }
+
+ /**
+ * Static graph represented as array of strings. Each cell (i, j) in such matrix means that there is connection
+ * between node(i) and node(j). Needed mostly to test bruteforce algorithm implementation.
+ */
+ static class StaticMatrix implements AdjacencyMatrixProvider {
+ /** Matrix. */
+ private final BitSet[] matrix;
+
+ /**
+ * @param strMatrix String matrix.
+ */
+ public StaticMatrix(@NotNull String[] strMatrix) {
+ A.ensure(strMatrix.length > 0, "Matrix should not be empty");
+ for (int i = 0; i < strMatrix.length; i++)
+ A.ensure(strMatrix[i].length() == strMatrix.length,
+ "Matrix should be quadratic. Problem row: " + i);
+
+ int nodes = strMatrix.length;
+
+ matrix = init(nodes);
+
+ for (int i = 0; i < nodes; i++)
+ for (int j = 0; j < nodes; j++)
+ matrix[i].set(j, strMatrix[i].charAt(j) == '1');
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet[] provide() {
+ return matrix;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "StaticMatrix{" +
+ "matrix=" + Arrays.toString(matrix) +
+ '}';
+ }
+ }
+
+ /**
+ * A graph splitted on several isolated fully-connected components,
+ * but each of such component have some connections to another to reach graph connectivity.
+ * Answer is this case should be the size max(Pi), where Pi size of each fully-connected component.
+ */
+ static class AlmostSplittedMatrix implements AdjacencyMatrixProvider {
+ /** Partition sizes. */
+ private final int[] partSizes;
+
+ /** Connections between parts. */
+ private final int connectionsBetweenParts;
+
+ /** Matrix. */
+ private final BitSet[] matrix;
+
+ /**
+ * @param connectionsBetweenParts Connections between parts.
+ * @param partSizes Partition sizes.
+ */
+ public AlmostSplittedMatrix(int connectionsBetweenParts, int... partSizes) {
+ A.ensure(connectionsBetweenParts >= 1 + partSizes.length, "There should be at least 1 connection between parts");
+ A.ensure(partSizes.length >= 2, "The should be at least 2 parts of cluster");
+ for (int i = 0; i < partSizes.length; i++)
+ A.ensure(partSizes[i] > 0, "Part size " + (i + 1) + " shouldn't be empty");
+
+ this.partSizes = partSizes.clone();
+ this.connectionsBetweenParts = connectionsBetweenParts;
+
+ int nodes = 0;
+ for (int i = 0; i < partSizes.length; i++)
+ nodes += partSizes[i];
+
+ matrix = init(nodes);
+
+ int[] startIdx = new int[partSizes.length];
+
+ for (int i = 0, total = 0; i < partSizes.length; i++) {
+ startIdx[i] = total;
+
+ fillAll(matrix, total, total + partSizes[i]);
+
+ total += partSizes[i];
+ }
+
+ Random random = new Random(777);
+
+ for (int i = 0, part1 = 0; i < connectionsBetweenParts; i++) {
+ int part2 = (part1 + 1) % partSizes.length;
+
+ // Pick 2 random nodes from 2 parts and add connection between them.
+ int idx1 = random.nextInt(partSizes[part1]) + startIdx[part1];
+ int idx2 = random.nextInt(partSizes[part2]) + startIdx[part2];
+
+ matrix[idx1].set(idx2);
+ matrix[idx2].set(idx1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet[] provide() {
+ return matrix;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "AlmostSplittedGraph{" +
+ "partSizes=" + Arrays.toString(partSizes) +
+ ", connectionsBetweenParts=" + connectionsBetweenParts +
+ '}';
+ }
+ }
+
+ /**
+ * Complete graph with several connections lost choosen randomly.
+ * In worst case each lost connection decreases potential size of maximal fully-connected component.
+ * So answer in this test case should be at least N - L, where N - nodes, L - lost connections.
+ */
+ static class SeveralConnectionsAreLostMatrix implements AdjacencyMatrixProvider {
+ /** Nodes. */
+ private final int nodes;
+
+ /** Lost connections. */
+ private final int lostConnections;
+
+ /** Matrix. */
+ private final BitSet[] matrix;
+
+ /**
+ * @param nodes Nodes.
+ * @param lostConnections Lost connections.
+ */
+ public SeveralConnectionsAreLostMatrix(int nodes, int lostConnections) {
+ A.ensure(nodes > 0, "There should be at least 1 node");
+
+ this.nodes = nodes;
+ this.lostConnections = lostConnections;
+
+ this.matrix = init(nodes);
+
+ fillAll(matrix, 0, nodes);
+
+ Random random = new Random(777);
+
+ for (int i = 0; i < lostConnections; i++) {
+ int idx1 = random.nextInt(nodes);
+ int idx2 = random.nextInt(nodes);
+
+ if (idx1 == idx2)
+ continue;
+
+ matrix[idx1].set(idx2, false);
+ matrix[idx2].set(idx1, false);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet[] provide() {
+ return matrix;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "SeveralConnectionsAreLost{" +
+ "nodes=" + nodes +
+ ", lostConnections=" + lostConnections +
+ '}';
+ }
+ }
+
+ /**
+ * Utility method to pre-create adjacency matrix.
+ *
+ * @param nodes Nodes in graph.
+ * @return Adjacency matrix.
+ */
+ private static BitSet[] init(int nodes) {
+ BitSet[] matrix = new BitSet[nodes];
+ for (int i = 0; i < nodes; i++)
+ matrix[i] = new BitSet(nodes);
+
+ return matrix;
+ }
+
+ /**
+ * Utility method to fill all connections between all nodes from {@code fromIdx} and {@code endIdx} exclusive.
+ *
+ * @param matrix Adjacency matrix.
+ * @param fromIdx Lower bound node index inclusive.
+ * @param endIdx Upper bound node index exclusive.
+ */
+ private static void fillAll(BitSet[] matrix, int fromIdx, int endIdx) {
+ for (int i = fromIdx; i < endIdx; i++)
+ for (int j = fromIdx; j < endIdx; j++) {
+ matrix[i].set(j);
+ matrix[j].set(i);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/534fcec8/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 408af30..03b874d 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -17,8 +17,13 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -41,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -84,6 +90,8 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
@@ -170,6 +178,9 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest {
private boolean testCommSpi;
/** */
+ private boolean failCommSpi;
+
+ /** */
private long sesTimeout;
/** */
@@ -335,6 +346,9 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest {
if (testCommSpi)
cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
+ if (failCommSpi)
+ cfg.setCommunicationSpi(new PeerToPeerCommunicationFailureSpi());
+
if (commFailureRslvr != null)
cfg.setCommunicationFailureResolver(commFailureRslvr.apply());
@@ -3559,6 +3573,369 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest {
}
/**
+ * A simple split-brain test, where cluster spliited on 2 parts of server nodes (2 and 3).
+ * There is also client which sees both parts of splitted cluster.
+ *
+ * Result cluster should be: 3 server nodes + 1 client.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSimpleSplitBrain() throws Exception {
+ failCommSpi = true;
+
+ startGridsMultiThreaded(5);
+
+ client = true;
+
+ startGridsMultiThreaded(5, 3);
+
+ List<ClusterNode> all = G.allGrids().stream()
+ .map(g -> g.cluster().localNode())
+ .collect(Collectors.toList());;
+
+ List<ClusterNode> part1 = all.subList(0, 3);
+ List<ClusterNode> part2 = all.subList(3, all.size());
+
+ ConnectionsFailureMatrix matrix = ConnectionsFailureMatrix.buildFrom(part1, part2);
+
+ ClusterNode lastClient = startGrid(8).cluster().localNode();
+
+ // Make last client connected to other nodes.
+ for (ClusterNode node : all) {
+ if (node.id().equals(lastClient.id()))
+ continue;
+
+ matrix.addConnection(lastClient, node);
+ matrix.addConnection(node, lastClient);
+ }
+
+ PeerToPeerCommunicationFailureSpi.fail(matrix);
+
+ waitForTopology(4);
+ }
+
+ /**
+ * A simple not actual split-brain test, where some connections between server nodes are lost.
+ * Server nodes: 5.
+ * Client nodes: 5.
+ * Lost connections between server nodes: 2.
+ *
+ * Result cluster should be: 3 server nodes + 5 clients.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNotActualSplitBrain() throws Exception {
+ failCommSpi = true;
+
+ startGridsMultiThreaded(5);
+
+ List<ClusterNode> srvNodes = G.allGrids().stream()
+ .map(g -> g.cluster().localNode())
+ .collect(Collectors.toList());
+
+ client = true;
+
+ startGridsMultiThreaded(5, 3);
+
+ client = false;
+
+ ConnectionsFailureMatrix matrix = new ConnectionsFailureMatrix();
+
+ matrix.addAll(G.allGrids().stream().map(g -> g.cluster().localNode()).collect(Collectors.toList()));
+
+ // Remove 2 connections between server nodes.
+ matrix.removeConnection(srvNodes.get(0), srvNodes.get(1));
+ matrix.removeConnection(srvNodes.get(1), srvNodes.get(0));
+ matrix.removeConnection(srvNodes.get(2), srvNodes.get(3));
+ matrix.removeConnection(srvNodes.get(3), srvNodes.get(2));
+
+ PeerToPeerCommunicationFailureSpi.fail(matrix);
+
+ waitForTopology(8);
+ }
+
+ /**
+ * Almost split-brain test, server nodes splitted on 2 parts and there are some connections between these 2 parts.
+ * Server nodes: 5.
+ * Client nodes: 5.
+ * Splitted on: 3 servers + 2 clients and 3 servers + 2 clients.
+ * Extra connections between server nodes: 3.
+ *
+ * Result cluster should be: 3 server nodes + 2 clients.
+ *
+ * @throws Exception If failed.
+ */
+ public void testAlmostSplitBrain() throws Exception {
+ failCommSpi = true;
+
+ startGridsMultiThreaded(6);
+
+ List<ClusterNode> srvNodes = G.allGrids().stream()
+ .map(g -> g.cluster().localNode())
+ .collect(Collectors.toList());
+
+ List<ClusterNode> srvPart1 = srvNodes.subList(0, 3);
+ List<ClusterNode> srvPart2 = srvNodes.subList(3, srvNodes.size());
+
+ client = true;
+
+ startGridsMultiThreaded(6, 5);
+
+ client = false;
+
+ List<ClusterNode> clientNodes = G.allGrids().stream()
+ .map(g -> g.cluster().localNode())
+ .filter(ClusterNode::isClient)
+ .collect(Collectors.toList());
+
+ List<ClusterNode> clientPart1 = clientNodes.subList(0, 2);
+ List<ClusterNode> clientPart2 = clientNodes.subList(2, 4);
+
+ List<ClusterNode> splittedPart1 = new ArrayList<>();
+ splittedPart1.addAll(srvPart1);
+ splittedPart1.addAll(clientPart1);
+
+ List<ClusterNode> splittedPart2 = new ArrayList<>();
+ splittedPart2.addAll(srvPart2);
+ splittedPart2.addAll(clientPart2);
+
+ ConnectionsFailureMatrix matrix = new ConnectionsFailureMatrix();
+
+ matrix.addAll(splittedPart1);
+ matrix.addAll(splittedPart2);
+
+ matrix.addConnection(srvPart1.get(0), srvPart2.get(1));
+ matrix.addConnection(srvPart2.get(1), srvPart1.get(0));
+
+ matrix.addConnection(srvPart1.get(1), srvPart2.get(2));
+ matrix.addConnection(srvPart2.get(2), srvPart1.get(1));
+
+ matrix.addConnection(srvPart1.get(2), srvPart2.get(0));
+ matrix.addConnection(srvPart2.get(0), srvPart1.get(2));
+
+ PeerToPeerCommunicationFailureSpi.fail(matrix);
+
+ waitForTopology(5);
+ }
+
+ /**
+ * Class represents available connections between cluster nodes.
+ * This is needed to simulate network problems in {@link PeerToPeerCommunicationFailureSpi}.
+ */
+ static class ConnectionsFailureMatrix {
+ /** Available connections per each node id. */
+ private Map<UUID, Set<UUID>> availableConnections = new HashMap<>();
+
+ /**
+ * @param from Cluster node 1.
+ * @param to Cluster node 2.
+ * @return {@code True} if there is connection between nodes {@code from} and {@code to}.
+ */
+ public boolean hasConnection(ClusterNode from, ClusterNode to) {
+ return availableConnections.getOrDefault(from.id(), Collections.emptySet()).contains(to.id());
+ }
+
+ /**
+ * Adds connection between nodes {@code from} and {@code to}.
+ * @param from Cluster node 1.
+ * @param to Cluster node 2.
+ */
+ public void addConnection(ClusterNode from, ClusterNode to) {
+ availableConnections.computeIfAbsent(from.id(), s -> new HashSet<>()).add(to.id());
+ }
+
+ /**
+ * Removes connection between nodes {@code from} and {@code to}.
+ * @param from Cluster node 1.
+ * @param to Cluster node 2.
+ */
+ public void removeConnection(ClusterNode from, ClusterNode to) {
+ availableConnections.getOrDefault(from.id(), Collections.emptySet()).remove(to.id());
+ }
+
+ /**
+ * Adds connections between all nodes presented in given {@code nodeSet}.
+ *
+ * @param nodeSet Set of the cluster nodes.
+ */
+ public void addAll(List<ClusterNode> nodeSet) {
+ for (int i = 0; i < nodeSet.size(); i++) {
+ for (int j = 0; j < nodeSet.size(); j++) {
+ if (i == j)
+ continue;
+
+ addConnection(nodeSet.get(i), nodeSet.get(j));
+ }
+ }
+ }
+
+ /**
+ * Builds connections failure matrix from two part of the cluster nodes.
+ * Each part has all connections inside, but hasn't any connection to another part.
+ *
+ * @param part1 Part 1.
+ * @param part2 Part 2.
+ * @return Connections failure matrix.
+ */
+ static ConnectionsFailureMatrix buildFrom(List<ClusterNode> part1, List<ClusterNode> part2) {
+ ConnectionsFailureMatrix matrix = new ConnectionsFailureMatrix();
+ matrix.addAll(part1);
+ matrix.addAll(part2);
+ return matrix;
+ }
+ }
+
+ /**
+ * Communication SPI with possibility to simulate network problems between some of the cluster nodes.
+ */
+ static class PeerToPeerCommunicationFailureSpi extends TcpCommunicationSpi {
+ /** Flag indicates that connections according to {@code matrix} should be failed. */
+ private static volatile boolean failure;
+
+ /** Connections failure matrix. */
+ private static ConnectionsFailureMatrix matrix;
+
+ /**
+ * Start failing connections according to given matrix {@code with}.
+ * @param with Failure matrix.
+ */
+ public static void fail(ConnectionsFailureMatrix with) {
+ matrix = with;
+ failure = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
+ // Creates connections statuses according to failure matrix.
+ BitSet bitSet = new BitSet();
+
+ ClusterNode localNode = getLocalNode();
+
+ int idx = 0;
+
+ for (ClusterNode remoteNode : nodes) {
+ if (localNode.id().equals(remoteNode.id()))
+ bitSet.set(idx);
+ else {
+ if (matrix.hasConnection(localNode, remoteNode))
+ bitSet.set(idx);
+ }
+ idx++;
+ }
+
+ return new IgniteFinishedFutureImpl<>(bitSet);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCommunicationClient createTcpClient(
+ ClusterNode node,
+ int connIdx
+ ) throws IgniteCheckedException {
+ if (failure && !matrix.hasConnection(getLocalNode(), node)) {
+ processClientCreationError(node, null, new IgniteCheckedException("Test", new SocketTimeoutException()));
+
+ return null;
+ }
+
+ return new FailingCommunicationClient(getLocalNode(), node,
+ super.createTcpClient(node, connIdx));
+ }
+
+ /**
+ * Communication client with possibility to simulate network error between peers.
+ */
+ class FailingCommunicationClient implements GridCommunicationClient {
+ /** Delegate. */
+ private final GridCommunicationClient delegate;
+
+ /** Local node which sends messages. */
+ private final ClusterNode localNode;
+
+ /** Remote node which receives messages. */
+ private final ClusterNode remoteNode;
+
+ FailingCommunicationClient(ClusterNode localNode, ClusterNode remoteNode, GridCommunicationClient delegate) {
+ this.delegate = delegate;
+ this.localNode = localNode;
+ this.remoteNode = remoteNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
+ if (failure && !matrix.hasConnection(localNode, remoteNode))
+ throw new IgniteCheckedException("Test", new SocketTimeoutException());
+
+ delegate.doHandshake(handshakeC);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean close() {
+ return delegate.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forceClose() {
+ delegate.forceClose();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean closed() {
+ return delegate.closed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean reserve() {
+ return delegate.reserve();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ delegate.release();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getIdleTime() {
+ return delegate.getIdleTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+ if (failure && !matrix.hasConnection(localNode, remoteNode))
+ throw new IgniteCheckedException("Test", new SocketTimeoutException());
+
+ delegate.sendMessage(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+ if (failure && !matrix.hasConnection(localNode, remoteNode))
+ throw new IgniteCheckedException("Test", new SocketTimeoutException());
+
+ delegate.sendMessage(data, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> c) throws IgniteCheckedException {
+ // This will enforce SPI to create new client.
+ if (failure && !matrix.hasConnection(localNode, remoteNode))
+ return true;
+
+ return delegate.sendMessage(nodeId, msg, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean async() {
+ return delegate.async();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int connectionIndex() {
+ return delegate.connectionIndex();
+ }
+ }
+ }
+
+ /**
* @param srvs Number of server nodes in test.
* @throws Exception If failed.
*/
@@ -3886,6 +4263,8 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest {
err = false;
+ failCommSpi = false;
+
evts.clear();
try {