You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/19 11:34:40 UTC
[7/7] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4525b921
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4525b921
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4525b921
Branch: refs/heads/ignite-zk-ce
Commit: 4525b9218b17786b9ddf6a3932fc3422b8cfc735
Parents: 246b765
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 19 13:34:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 19 14:34:25 2017 +0300
----------------------------------------------------------------------
.../DefaultCommunicationProblemResolver.java | 199 +++++++++++++++++++
.../configuration/IgniteConfiguration.java | 12 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 3 +
.../internal/managers/GridManagerAdapter.java | 8 +
.../discovery/GridDiscoveryManager.java | 68 +++++++
.../managers/discovery/IgniteDiscoverySpi.java | 4 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 10 +
.../org/apache/ignite/spi/IgniteSpiContext.java | 4 +
.../communication/tcp/TcpCommunicationSpi.java | 18 +-
.../DefaultCommunicationProblemResolver.java | 174 ----------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 4 +-
.../ZkCommunicationErrorProcessFuture.java | 2 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 33 +--
.../ZookeeperDiscoverySpiBasicTest.java | 102 ++++++++--
.../testframework/GridSpiTestContext.java | 10 +
17 files changed, 433 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
new file mode 100644
index 0000000..1e973d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver {
+ /** {@inheritDoc} */
+ @Override public void resolve(CommunicationProblemContext ctx) {
+ ClusterGraph graph = new ClusterGraph(ctx);
+
+ BitSet cluster = graph.findLargestIndependentCluster();
+
+ List<ClusterNode> nodes = ctx.topologySnapshot();
+
+ if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) {
+ for (int i = 0; i < nodes.size(); i++) {
+ if (!cluster.get(i))
+ ctx.killNode(nodes.get(i));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClusterGraph {
+ /** */
+ private final static int WORD_IDX_SHIFT = 6;
+
+ /**
+ * @param bitIndex Bit index.
+ * @return Word index containing bit with given index.
+ */
+ private static int wordIndex(int bitIndex) {
+ return bitIndex >> WORD_IDX_SHIFT;
+ }
+
+ /** */
+ private final int nodeCnt;
+
+ /** */
+ private final long[] visitBitSet;
+
+ /** */
+ private final CommunicationProblemContext ctx;
+
+ /** */
+ private final List<ClusterNode> nodes;
+
+ /**
+ * @param ctx Context.
+ */
+ ClusterGraph(CommunicationProblemContext ctx) {
+ this.ctx = ctx;
+
+ nodes = ctx.topologySnapshot();
+
+ nodeCnt = nodes.size();
+
+ assert nodeCnt > 0;
+
+ visitBitSet = initBitSet(nodeCnt);
+ }
+
+ /**
+ * @param bitCnt Number of bits.
+ * @return Bit set words.
+ */
+ static long[] initBitSet(int bitCnt) {
+ return new long[wordIndex(bitCnt - 1) + 1];
+ }
+
+ /**
+ * @return Cluster nodes bit set.
+ */
+ BitSet findLargestIndependentCluster() {
+ BitSet maxCluster = null;
+ int maxClusterSize = 0;
+
+ for (int i = 0; i < nodeCnt; i++) {
+ if (getBit(visitBitSet, i))
+ continue;
+
+ BitSet cluster = new BitSet(nodeCnt);
+
+ search(cluster, i);
+
+ int size = cluster.cardinality();
+
+ if (maxCluster == null || size > maxClusterSize) {
+ maxCluster = cluster;
+ maxClusterSize = size;
+ }
+ }
+
+ return maxCluster;
+ }
+
+ /**
+ * @param cluster Cluster nodes bit set.
+ * @return {@code True} if all cluster nodes are able to connect to each other.
+ */
+ boolean checkFullyConnected(BitSet cluster) {
+ int startIdx = 0;
+
+ int clusterNodes = cluster.cardinality();
+
+ for (;;) {
+ int idx = cluster.nextSetBit(startIdx);
+
+ if (idx == -1)
+ break;
+
+ ClusterNode node1 = nodes.get(idx);
+
+ for (int i = 0; i < clusterNodes; i++) {
+ if (!cluster.get(i) || i == idx)
+ continue;
+
+ ClusterNode node2 = nodes.get(i);
+
+ if (cluster.get(i) && !ctx.connectionAvailable(node1, node2))
+ return false;
+ }
+
+ startIdx = idx + 1;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param cluster Current cluster bit set.
+ * @param idx Node index.
+ */
+ void search(BitSet cluster, int idx) {
+ setBit(visitBitSet, idx);
+
+ cluster.set(idx);
+
+ ClusterNode node1 = nodes.get(idx);
+
+ for (int i = 0; i < nodeCnt; i++) {
+ if (i == idx || getBit(visitBitSet, i))
+ continue;
+
+ ClusterNode node2 = nodes.get(i);
+
+ boolean connected = ctx.connectionAvailable(node1, node2) ||
+ ctx.connectionAvailable(node2, node1);
+
+ if (connected)
+ search(cluster, i);
+ }
+ }
+
+ /**
+ * @param words Bit set words.
+ * @param bitIndex Bit index.
+ */
+ static void setBit(long words[], int bitIndex) {
+ int wordIndex = wordIndex(bitIndex);
+
+ words[wordIndex] |= (1L << bitIndex);
+ }
+
+ /**
+ * @param words Bit set words.
+ * @param bitIndex Bit index.
+ * @return Bit value.
+ */
+ static boolean getBit(long[] words, int bitIndex) {
+ int wordIndex = wordIndex(bitIndex);
+
+ return (words[wordIndex] & (1L << bitIndex)) != 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 8c3c818..dbf5fbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -595,12 +595,22 @@ public class IgniteConfiguration {
warmupClos = cfg.getWarmupClosure();
}
+ /**
+ * TODO ZK
+ * @return
+ */
public CommunicationProblemResolver getCommunicationProblemResolver() {
return commProblemRslvr;
}
- public void setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) {
+ /**
+ * TODO ZK
+ * @param commProblemRslvr
+ */
+ public IgniteConfiguration setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) {
this.commProblemRslvr = commProblemRslvr;
+
+ return this;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a641e6e..85bb7c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2696,6 +2696,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
objs.add(cfg.getGridLogger());
objs.add(cfg.getMBeanServer());
+ if (cfg.getCommunicationProblemResolver() != null)
+ objs.add(cfg.getCommunicationProblemResolver());
+
return objs;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 62ebcd8..8ae6313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -71,6 +71,7 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -2270,6 +2271,8 @@ public class IgnitionEx {
initializeDefaultSpi(myCfg);
+ GridDiscoveryManager.initCommunicationErrorResolveConfiguration(myCfg);
+
initializeDefaultCacheConfiguration(myCfg);
ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index a151eb5..df84212 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -604,6 +604,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.nodeAttributes();
}
+ @Override public boolean communicationErrorResolveSupported() {
+ return ctx.discovery().communicationErrorResolveSupported();
+ }
+
+ @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+ ctx.discovery().resolveCommunicationError(node, err);
+ }
+
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 97441d7..172615c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -53,8 +53,11 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DefaultCommunicationProblemResolver;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
@@ -108,6 +111,8 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
@@ -545,6 +550,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
});
}
+ if (ctx.config().getCommunicationProblemResolver() != null)
+ ctx.resource().injectGeneric(ctx.config().getCommunicationProblemResolver());
+
spi.setListener(new DiscoverySpiListener() {
private long gridStartTime;
@@ -2336,6 +2344,66 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
).start();
}
+ /**
+ * @param cfg Configuration.
+ * @throws IgniteCheckedException If configuration is not valid.
+ */
+ public static void initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
+ CommunicationProblemResolver rslvr = cfg.getCommunicationProblemResolver();
+ CommunicationSpi commSpi = cfg.getCommunicationSpi();
+ DiscoverySpi discoverySpi = cfg.getDiscoverySpi();
+
+ if (rslvr != null) {
+ if (!supportsCommunicationErrorResolve(commSpi))
+ throw new IgniteCheckedException("CommunicationProblemResolver is configured, but communication SPI does not support communication" +
+ "problem resolve: " + commSpi.getClass().getName());
+
+ if (!supportsCommunicationErrorResolve(discoverySpi))
+ throw new IgniteCheckedException("CommunicationProblemResolver is configured, but discovery SPI does not support communication" +
+ "problem resolve: " + discoverySpi.getClass().getName());
+ }
+ else {
+ if (supportsCommunicationErrorResolve(commSpi) && supportsCommunicationErrorResolve(discoverySpi))
+ cfg.setCommunicationProblemResolver(new DefaultCommunicationProblemResolver());
+ }
+ }
+
+ /**
+ * @param spi Discovery SPI.
+ * @return {@code True} if SPI supports communication error resolve.
+ */
+ private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) {
+ return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationErrorResolve();
+ }
+
+ /**
+ * @param spi Discovery SPI.
+ * @return {@code True} if SPI supports communication error resolve.
+ */
+ private static boolean supportsCommunicationErrorResolve(CommunicationSpi spi) {
+ return spi instanceof TcpCommunicationSpi;
+ }
+
+ /**
+ * @return {@code True} if communication error resolve is supported.
+ */
+ public boolean communicationErrorResolveSupported() {
+ return ctx.config().getCommunicationProblemResolver() != null;
+ }
+
+ /**
+ * @param node Problem node.
+ * @param err Error.
+ */
+ public void resolveCommunicationError(ClusterNode node, Exception err) {
+ DiscoverySpi spi = getSpi();
+
+ if (!supportsCommunicationErrorResolve(spi) || !supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi()))
+ throw new UnsupportedOperationException();
+
+ ((IgniteDiscoverySpi)spi).resolveCommunicationError(node, err);
+ }
+
/** Worker for network segment checks. */
private class SegmentCheckWorker extends GridWorker {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 9a1faa2..bf117f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.spi.discovery.DiscoverySpi;
/**
- * TODO ZK
+ *
*/
public interface IgniteDiscoverySpi extends DiscoverySpi {
/**
@@ -63,5 +63,5 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
* @param node Problem node.
* @param err Connection error.
*/
- public void onCommunicationConnectionError(ClusterNode node, Exception err);
+ public void resolveCommunicationError(ClusterNode node, Exception err);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 07ba214..50cf9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -957,5 +957,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
@Override public Map<String, Object> nodeAttributes() {
return Collections.emptyMap();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean communicationErrorResolveSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+ throw new UnsupportedOperationException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 96b3e61..f9c6ffd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -358,4 +358,8 @@ public interface IgniteSpiContext {
* @return Current node attributes.
*/
public Map<String, Object> nodeAttributes();
+
+ public boolean communicationErrorResolveSupported();
+
+ public void resolveCommunicationError(ClusterNode node, Exception err);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 957a0e1..37be29f 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3409,22 +3409,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
boolean commErrResolve = false;
- if (connectionError(errs)) {
- DiscoverySpi discoverySpi = ignite.configuration().getDiscoverySpi();
+ IgniteSpiContext ctx = getSpiContext();
- if (discoverySpi instanceof IgniteDiscoverySpi) {
- IgniteDiscoverySpi discoverySpi0 = (IgniteDiscoverySpi)discoverySpi;
+ if (connectionError(errs) && ctx.communicationErrorResolveSupported()) {
+ commErrResolve = true;
- if (discoverySpi0.supportsCommunicationErrorResolve()) {
- commErrResolve = true;
-
- discoverySpi0.onCommunicationConnectionError(node, errs);
- }
- }
+ ctx.resolveCommunicationError(node, errs);
}
if (!commErrResolve && enableForcibleNodeKill) {
- if (getSpiContext().node(node.id()) != null
+ if (ctx.node(node.id()) != null
&& (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
connectionError(errs)) {
String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
@@ -3435,7 +3429,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else
U.warn(log, msg);
- getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+ ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
"rmtNode=" + node +
", errs=" + errs +
", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
deleted file mode 100644
index b2d4bf0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery;
-
-import java.util.BitSet;
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CommunicationProblemContext;
-import org.apache.ignite.configuration.CommunicationProblemResolver;
-
-/**
- *
- */
-public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver {
- /** {@inheritDoc} */
- @Override public void resolve(CommunicationProblemContext ctx) {
- ClusterGraph graph = new ClusterGraph(ctx);
-
- BitSet cluster = graph.findLargestIndependentCluster();
-
- List<ClusterNode> nodes = ctx.topologySnapshot();
-
- if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) {
- for (int i = 0; i < nodes.size(); i++) {
- if (!cluster.get(i))
- ctx.killNode(nodes.get(i));
- }
- }
- }
-
- /**
- *
- */
- private static class ClusterGraph {
- /** */
- private final static int WORD_IDX_SHIFT = 6;
-
- /**
- * @param bitIndex Bit index.
- * @return Word index containing bit with given index.
- */
- private static int wordIndex(int bitIndex) {
- return bitIndex >> WORD_IDX_SHIFT;
- }
-
- /** */
- private final int nodeCnt;
-
- /** */
- private final long[] visitBitSet;
-
- /** */
- private final CommunicationProblemContext ctx;
-
- /** */
- private final List<ClusterNode> nodes;
-
- ClusterGraph(CommunicationProblemContext ctx) {
- this.ctx = ctx;
-
- nodes = ctx.topologySnapshot();
-
- nodeCnt = nodes.size();
-
- assert nodeCnt > 0;
-
- visitBitSet = initBitSet(nodeCnt);
- }
-
- static long[] initBitSet(int bitCnt) {
- return new long[wordIndex(bitCnt - 1) + 1];
- }
-
- BitSet findLargestIndependentCluster() {
- BitSet maxCluster = null;
- int maxClusterSize = 0;
-
- for (int i = 0; i < nodeCnt; i++) {
- if (getBit(visitBitSet, i))
- continue;
-
- BitSet cluster = new BitSet(nodeCnt);
-
- search(cluster, i);
-
- int size = cluster.cardinality();
-
- if (maxCluster == null || size > maxClusterSize) {
- maxCluster = cluster;
- maxClusterSize = size;
- }
- }
-
- return maxCluster;
- }
-
- boolean checkFullyConnected(BitSet cluster) {
- int startIdx = 0;
-
- int clusterNodes = cluster.cardinality();
-
- for (;;) {
- int idx = cluster.nextSetBit(startIdx);
-
- if (idx == -1)
- break;
-
- ClusterNode node1 = nodes.get(idx);
-
- for (int i = 0; i < clusterNodes; i++) {
- if (!cluster.get(i) || i == idx)
- continue;
-
- ClusterNode node2 = nodes.get(i);
-
- if (cluster.get(i) && ctx.connectionAvailable(node1, node2))
- return false;
- }
-
- startIdx = idx + 1;
- }
-
- return true;
- }
-
- void search(BitSet cluster, int idx) {
- setBit(visitBitSet, idx);
-
- cluster.set(idx);
-
- ClusterNode node1 = nodes.get(idx);
-
- for (int i = 0; i < nodeCnt; i++) {
- if (i == idx || getBit(visitBitSet, i))
- continue;
-
- ClusterNode node2 = nodes.get(i);
-
- boolean connected = ctx.connectionAvailable(node1, node2) ||
- ctx.connectionAvailable(node2, node1);
-
- if (connected)
- search(cluster, i);
- }
- }
-
- static void setBit(long words[], int bitIndex) {
- int wordIndex = wordIndex(bitIndex);
-
- words[wordIndex] |= (1L << bitIndex);
- }
-
- static boolean getBit(long[] words, int bitIndex) {
- int wordIndex = wordIndex(bitIndex);
-
- return (words[wordIndex] & (1L << bitIndex)) != 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 0e2f851..ad8eca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2111,7 +2111,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
}
/** {@inheritDoc} */
- @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) {
+ @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 98a22d7..14bb107 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -195,8 +195,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
}
/** {@inheritDoc} */
- @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) {
- impl.onCommunicationConnectionError(node, err);
+ @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+ impl.resolveCommunicationError(node, err);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 0074817..e64c801 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -148,7 +148,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
* @param futPath Future path.
* @param nodes Nodes to ping.
*/
- void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+ void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) {
final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ef3504f..f1ad869 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.GridIntList;
@@ -247,17 +248,17 @@ public class ZookeeperDiscoveryImpl {
* @param node0 Problem node ID
* @param err Connect error.
*/
- public void onCommunicationConnectionError(ClusterNode node0, Exception err) {
- checkState();
-
+ public void resolveCommunicationError(ClusterNode node0, Exception err) {
ZookeeperClusterNode node = node(node0.id());
if (node == null)
- return;
+ throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
IgniteInternalFuture<Boolean> nodeStatusFut;
for (;;) {
+ checkState();
+
ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
if (fut == null || fut.isDone()) {
@@ -273,16 +274,16 @@ public class ZookeeperDiscoveryImpl {
", err= " + err + ']');
}
- ConnectionState connState;
+ try {
+ checkState();
+ }
+ catch (Exception e) {
+ fut.onError(e);
- synchronized (this) {
- connState = this.connState;
+ throw e;
}
- if (connState != ConnectionState.STARTED)
- fut.onError(new IgniteCheckedException("Node stopped."));
- else
- fut.scheduleCheckOnTimeout();
+ fut.scheduleCheckOnTimeout();
}
else
fut = commErrProcFut.get();
@@ -303,7 +304,8 @@ public class ZookeeperDiscoveryImpl {
}
try {
- nodeStatusFut.get();
+ if (!nodeStatusFut.get())
+ throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException(e);
@@ -2240,7 +2242,7 @@ public class ZookeeperDiscoveryImpl {
runInWorkerThread(new ZkRunnable(rtState, this) {
@Override protected void run0() throws Exception {
- fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
+ fut0.checkConnection(rtState, futPath, rtState.commErrProcNodes);
}
});
}
@@ -2341,7 +2343,8 @@ public class ZookeeperDiscoveryImpl {
", rslvr=" + rslvr.getClass().getSimpleName() + ']');
}
- ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(topSnapshot,
+ ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(
+ topSnapshot,
initialNodes,
nodesRes);
@@ -2352,7 +2355,7 @@ public class ZookeeperDiscoveryImpl {
if (killedNodes != null) {
if (log.isInfoEnabled()) {
- log.info("Communication error resolver forces nodes stop [reqId=" + futId +
+ log.info("Communication error resolver forced nodes stop [reqId=" + futId +
", killNodeCnt=" + killedNodes.size() +
", nodeIds=" + U.nodeIds(killedNodes) + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 58098b9..44e48f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
@@ -72,9 +73,11 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.logger.java.JavaLogger;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.configuration.CommunicationProblemContext;
import org.apache.ignite.configuration.CommunicationProblemResolver;
@@ -147,7 +150,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
private boolean persistence;
/** */
- private CommunicationProblemResolver commProblemRslvr;
+ private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -248,7 +251,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
if (commProblemRslvr != null)
- cfg.setCommunicationProblemResolver(commProblemRslvr);
+ cfg.setCommunicationProblemResolver(commProblemRslvr.apply());
return cfg;
}
@@ -1808,7 +1811,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
assert nodes > 1;
sesTimeout = 2000;
- commProblemRslvr = new NoOpCommunicationProblemResolver();
+ commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
startGridsMultiThreaded(nodes);
@@ -1828,7 +1831,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
ZookeeperDiscoverySpi spi = spi(ignite(idx1));
- spi.onCommunicationConnectionError(ignite(idx2).cluster().localNode(), new Exception("test"));
+ spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test"));
checkInternalStructuresCleanup();
}
@@ -1840,7 +1843,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
public void testNoOpCommunicationErrorResolve_3() throws Exception {
// One node fails before sending communication status.
sesTimeout = 2000;
- commProblemRslvr = new NoOpCommunicationProblemResolver();
+ commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
startGridsMultiThreaded(3);
@@ -1855,7 +1858,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
@Override public Object call() {
ZookeeperDiscoverySpi spi = spi(ignite(0));
- spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test"));
+ spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test"));
return null;
}
@@ -1883,11 +1886,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testNoOpCommunicationErrorResolve_4() throws Exception {
- // Coordinator changes while resolve process is in progress.
+ // Coordinator fails while resolve process is in progress.
testCommSpi = true;
sesTimeout = 2000;
- commProblemRslvr = new NoOpCommunicationProblemResolver();
+ commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
startGrid(0);
@@ -1901,7 +1904,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
@Override public Object call() {
ZookeeperDiscoverySpi spi = spi(ignite(1));
- spi.onCommunicationConnectionError(ignite(2).cluster().localNode(), new Exception("test"));
+ spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test"));
return null;
}
@@ -1970,6 +1973,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
/**
+ * TODO ZK: kill random, kill coordinator multiple times.
+ *
* @param startNodes Number of nodes to start.
* @param killNodes Nodes to kill by resolve process.
* @throws Exception If failed.
@@ -1977,7 +1982,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception {
testCommSpi = true;
- commProblemRslvr = new TestNodeKillCommunicationProblemResolver(killNodes);
+ commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes);
startGrids(startNodes);
@@ -1986,20 +1991,28 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
commSpi.checkRes = new BitSet(startNodes);
ZookeeperDiscoverySpi spi = null;
+ UUID killNodeId = null;
for (Ignite node : G.allGrids()) {
ZookeeperDiscoverySpi spi0 = spi(node);
- if (!killNodes.contains(node.cluster().localNode().order())) {
+ if (!killNodes.contains(node.cluster().localNode().order()))
spi = spi0;
-
- break;
- }
+ else
+ killNodeId = node.cluster().localNode().id();
}
assertNotNull(spi);
+ assertNotNull(killNodeId);
- spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test"));
+ try {
+ spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test"));
+
+ fail("Exception is not thrown");
+ }
+ catch (IgniteSpiException e) {
+ assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
+ }
int expNodes = startNodes - killNodes.size();
@@ -2016,6 +2029,46 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testDefaultCommunicationErrorResolver1() throws Exception {
+ testCommSpi = true;
+ sesTimeout = 5000;
+
+ startGrids(3);
+
+ {
+ ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0));
+ commSpi.checkRes = new BitSet(3);
+ commSpi.checkRes.set(0);
+ commSpi.checkRes.set(1);
+ }
+ {
+ ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(1));
+ commSpi.checkRes = new BitSet(3);
+ commSpi.checkRes.set(0);
+ commSpi.checkRes.set(1);
+ }
+ {
+ ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(2));
+ commSpi.checkRes = new BitSet(3);
+ commSpi.checkRes.set(2);
+ }
+
+ UUID killedId = nodeId(2);
+
+ assertNotNull(ignite(0).cluster().node(killedId));
+
+ ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+ spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
+
+ waitForTopology(2);
+
+ assertNull(ignite(0).cluster().node(killedId));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testConnectionCheck() throws Exception {
final int NODES = 5;
@@ -2709,6 +2762,13 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
*
*/
static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver {
+ /** */
+ static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() {
+ @Override public CommunicationProblemResolver apply() {
+ return new NoOpCommunicationProblemResolver();
+ }
+ };
+
/** {@inheritDoc} */
@Override public void resolve(CommunicationProblemContext ctx) {
// No-op.
@@ -2719,6 +2779,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
*
*/
static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver {
+ /**
+ * @param killOrders Killed nodes order.
+ * @return Factory.
+ */
+ static IgniteOutClosure<CommunicationProblemResolver> factory(final Collection<Long> killOrders) {
+ return new IgniteOutClosure<CommunicationProblemResolver>() {
+ @Override public CommunicationProblemResolver apply() {
+ return new TestNodeKillCommunicationProblemResolver(killOrders);
+ }
+ };
+ }
+
/** */
final Collection<Long> killNodeOrders;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 93cd911..36403fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -607,6 +607,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
return Collections.emptyMap();
}
+ /** {@inheritDoc} */
+ @Override public boolean communicationErrorResolveSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.