You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/24 08:47:41 UTC
[34/50] [abbrv] ignite git commit: ignite-6519 Race in
SplitAwareTopologyValidator on activator and server node join
ignite-6519 Race in SplitAwareTopologyValidator on activator and server node join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d90b8fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d90b8fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d90b8fe
Branch: refs/heads/ignite-3478-tree
Commit: 5d90b8feb5eb65ce190ca4106d31f386c7be42a3
Parents: 01daee6
Author: Alexandr Kuramshin <ak...@gridgain.com>
Authored: Mon Oct 23 15:28:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 23 15:28:28 2017 +0300
----------------------------------------------------------------------
.../internal/TestRecordingCommunicationSpi.java | 12 +
...niteTopologyValidatorGridSplitCacheTest.java | 358 +++++++++++++++----
.../IgniteCacheTopologySplitAbstractTest.java | 266 ++++++++++++++
3 files changed, 564 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index ab61687..cf4f059 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -71,6 +72,12 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
+ // All ignite code expects that 'send' fails after discovery listener for node fail finished.
+ if (getSpiContext().node(node.id()) == null) {
+ throw new IgniteSpiException(new ClusterTopologyCheckedException("Failed to send message" +
+ " (node left topology): " + node));
+ }
+
if (msg instanceof GridIoMessage) {
GridIoMessage ioMsg = (GridIoMessage)msg;
@@ -115,6 +122,11 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
super.sendMessage(node, msg, ackC);
}
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ sendMessage(node, msg, null);
+ }
+
/**
* @param recordP Record predicate.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index 1f3b875..1885e9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -17,32 +17,43 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
/**
* Tests complex scenario with topology validator. Grid is split between to data centers, defined by attribute {@link
* #DC_NODE_ATTR}. If only nodes from single DC are left in topology, grid is moved into inoperative state until special
* activator node'll enter a topology, enabling grid operations.
*/
-public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstractTest {
+public class IgniteTopologyValidatorGridSplitCacheTest extends IgniteCacheTopologySplitAbstractTest {
+
/** */
private static final String DC_NODE_ATTR = "dc";
@@ -50,10 +61,10 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
private static final String ACTIVATOR_NODE_ATTR = "split.resolved";
/** */
- private static final int GRID_CNT = 8;
+ private static final int GRID_CNT = 32;
/** */
- private static final int CACHES_CNT = 100;
+ private static final int CACHES_CNT = 50;
/** */
private static final int RESOLVER_GRID_IDX = GRID_CNT;
@@ -62,7 +73,62 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1;
/** */
- private boolean useCacheGrp = false;
+ private static final String STATIC_IP = "127.0.0.1";
+
+ /** */
+ private static final Collection<String> SEG_FINDER_0;
+
+ /** */
+ private static final Collection<String> SEG_FINDER_1;
+
+ /** */
+ private static final Collection<String> SEG_FINDER_ALL;
+
+ static {
+ Collection<String> seg0 = new ArrayList<>();
+
+ Collection<String> seg1 = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT; i += 2) {
+ seg0.add(STATIC_IP + ':' + (DFLT_PORT + i));
+
+ seg1.add(STATIC_IP + ':' + (DFLT_PORT + i + 1));
+ }
+ SEG_FINDER_0 = Collections.unmodifiableCollection(seg0);
+
+ SEG_FINDER_1 = Collections.unmodifiableCollection(seg1);
+
+ SEG_FINDER_ALL = F.concat(false, SEG_FINDER_0, SEG_FINDER_1);
+ }
+
+ /** */
+ private boolean useCacheGrp;
+
+ /** */
+ private int getDiscoPort(int gridIdx) {
+ return DFLT_PORT + gridIdx;
+ }
+
+ /** */
+ private boolean isDiscoPort(int port) {
+ return port >= DFLT_PORT &&
+ port <= (DFLT_PORT + TcpDiscoverySpi.DFLT_PORT_RANGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isBlocked(int locPort, int rmtPort) {
+ return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort);
+ }
+
+ /** */
+ private int segment(int discoPort) {
+ return (discoPort - DFLT_PORT) % 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int segment(ClusterNode node) {
+ return node.attribute(DC_NODE_ATTR);
+ }
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -70,17 +136,32 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
int idx = getTestIgniteInstanceIndex(gridName);
- cfg.setUserAttributes(F.asMap(DC_NODE_ATTR, idx % 2));
+ Map<String, Object> userAttrs = new HashMap<>(4);
+
+ int segment = idx % 2;
+
+ userAttrs.put(DC_NODE_ATTR, segment);
+
+ TcpDiscoverySpi disco = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ disco.setLocalPort(getDiscoPort(idx));
+
+ disco.setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmented() ?
+ (segment == 0 ? SEG_FINDER_0 : SEG_FINDER_1) : SEG_FINDER_ALL));
if (idx != CONFIGLESS_GRID_IDX) {
if (idx == RESOLVER_GRID_IDX) {
cfg.setClientMode(true);
- cfg.setUserAttributes(F.asMap(ACTIVATOR_NODE_ATTR, "true"));
+ userAttrs.put(ACTIVATOR_NODE_ATTR, "true");
}
else
cfg.setActiveOnStart(false);
}
+ cfg.setUserAttributes(userAttrs);
+
+ cfg.setMemoryConfiguration(new MemoryConfiguration().
+ setDefaultMemoryPolicySize((50L << 20) + (100L << 20) * CACHES_CNT / GRID_CNT));
return cfg;
}
@@ -129,6 +210,12 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
stopAllGrids();
}
+ /** */
+ protected void stopGrids(int... grids) {
+ for (int idx : grids)
+ stopGrid(idx);
+ }
+
/**
* Tests topology split scenario.
*
@@ -149,8 +236,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/**
* Tests topology split scenario.
- * @param useCacheGrp Use cache group.
*
+ * @param useCacheGrp Use cache group.
* @throws Exception If failed.
*/
private void testTopologyValidator0(boolean useCacheGrp) throws Exception {
@@ -161,31 +248,26 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
grid.getOrCreateCaches(getCacheConfigurations());
// Init grid index arrays
- int[] dc1 = new int[GRID_CNT / 2];
+ int[] seg1 = new int[GRID_CNT / 2];
- for (int i = 0; i < dc1.length; ++i)
- dc1[i] = i * 2 + 1;
+ for (int i = 0; i < seg1.length; ++i)
+ seg1[i] = i * 2 + 1;
- int[] dc0 = new int[GRID_CNT - dc1.length];
+ int[] seg0 = new int[GRID_CNT - seg1.length];
- for (int i = 0; i < dc0.length; ++i)
- dc0[i] = i * 2;
+ for (int i = 0; i < seg0.length; ++i)
+ seg0[i] = i * 2;
// Tests what each node is able to do puts.
- tryPut(dc0);
-
- tryPut(dc1);
+ tryPut(seg0, seg1);
clearAll();
// Force segmentation.
- for (int idx : dc1)
- stopGrid(idx);
-
- awaitPartitionMapExchange();
+ splitAndWait();
try {
- tryPut(dc0);
+ tryPut(seg0, seg1);
fail();
}
@@ -196,24 +278,41 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
// Repair split by adding activator node in topology.
resolveSplit();
- tryPut(dc0);
+ tryPut(seg0);
clearAll();
+ try {
+ tryPut(seg1);
+
+ fail();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ stopGrids(seg1);
+
// Fix split by adding node from second DC.
+ unsplit();
+
startGrid(CONFIGLESS_GRID_IDX);
awaitPartitionMapExchange();
+ tryPut(seg0);
+
tryPut(CONFIGLESS_GRID_IDX);
+ clearAll();
+
// Force split by removing last node from second DC.
stopGrid(CONFIGLESS_GRID_IDX);
awaitPartitionMapExchange();
try {
- tryPut(dc0);
+ tryPut(seg0);
fail();
}
@@ -221,10 +320,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
// No-op.
}
+ // Repair split with concurrent server node join race.
+ resolveSplitWithRace(CONFIGLESS_GRID_IDX);
+
// Repair split by adding activator node in topology.
resolveSplit();
- tryPut(dc0);
+ tryPut(seg0);
clearAll();
@@ -233,9 +335,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
awaitPartitionMapExchange();
- for (int i = 0; i < dc0.length; i++) {
- int idx = dc0[i];
-
+ for (int idx : seg0) {
if (idx == 0)
continue;
@@ -249,7 +349,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
awaitPartitionMapExchange();
- assertEquals("Expecting put count", CACHES_CNT * dc0.length, tryPut(dc0));
+ assertEquals("Expecting put count", CACHES_CNT * seg0.length, tryPut(seg0));
}
/**
@@ -277,39 +377,132 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
}
/**
- * @param grids Grids to test.
+ * Resolves split by client node join with server node join race simulation.
+ *
+ * @param srvNode server node index to simulate join race
+ * @throws Exception If failed.
*/
- private int tryPut(int... grids) {
+ private void resolveSplitWithRace(int srvNode) throws Exception {
+ startGrid(RESOLVER_GRID_IDX);
+
+ startGrid(srvNode);
+
+ awaitPartitionMapExchange();
+
+ tryPut(srvNode);
+
+ clearAll();
+
+ stopGrid(srvNode);
+
+ awaitPartitionMapExchange();
+
+ try {
+ tryPut(0);
+
+ fail();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ stopGrid(RESOLVER_GRID_IDX);
+ }
+
+ /**
+ * @param idx Grid to test.
+ * @return number of successful puts to caches
+ * @throws IgniteException If all tries to put was failed.
+ * @throws AssertionError If some of tries to put was failed.
+ */
+ private int tryPut(int idx) {
+ IgniteEx g = grid(idx);
+
int putCnt = 0;
- for (int i = 0; i < grids.length; i++) {
- IgniteEx g = grid(grids[i]);
- for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
- String cacheName = testCacheName(cnt);
+ IgniteException ex = null;
- for (int k = 0; k < 100; k++) {
- if (g.affinity(cacheName).isPrimary(g.localNode(), k)) {
- IgniteCache<Object, Object> cache = g.cache(cacheName);
+ for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
+ String cacheName = testCacheName(cnt);
- try {
- cache.put(k, k);
- }
- catch (Throwable t) {
- log.error("Failed to put entry: [cache=" + cacheName + ", key=" + k + ", nodeId=" +
- g.name() + ']', t);
+ int key = -1;
- throw t;
- }
+ Affinity<Object> aff = g.affinity(cacheName);
+
+ for (int k = 0; k < aff.partitions(); k++) {
+ if (aff.isPrimary(g.cluster().localNode(), k)) {
+ key = k;
- assertEquals(1, cache.localSize());
+ break;
+ }
+ }
- putCnt++;
+ assertTrue("Failed to find affinity key [gridIdx=" + idx +", cache=" + cacheName + ']',
+ key != -1);
- break;
- }
+ IgniteCache<Object, Object> cache = g.cache(cacheName);
+
+ try {
+ cache.put(key, key);
+
+ assertEquals(1, cache.localSize());
+
+ if (ex != null)
+ throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx +
+ ", cacheName=" + cacheName + ']', ex);
+
+ putCnt++;
+ }
+ catch (Throwable t) {
+ IgniteException e = new IgniteException("Failed to put entry [cache=" + cacheName + ", key=" +
+ key + ']', t);
+
+ log.error(e.getMessage(), e.getCause());
+
+ if (ex == null)
+ ex = new IgniteException("Failed to put entry [node=" + g.name() + ']');
+
+ ex.addSuppressed(t);
+ }
+ }
+ if (ex != null)
+ throw ex;
+
+ return putCnt;
+ }
+
+ /**
+ * @param grids Grids to test.
+ * @return number of successful puts to caches
+ * @throws IgniteException If all tries to put was failed.
+ * @throws AssertionError If some of tries to put was failed.
+ */
+ private int tryPut(int[]... grids) {
+ int putCnt = 0;
+
+ IgniteException ex = null;
+
+ for (int[] idxs : grids) {
+ for (int idx : idxs) {
+ try {
+ int cnt = tryPut(idx);
+
+ if (ex != null)
+ throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx +
+ ", sucessful puts = " + cnt + ']', ex);
+
+ putCnt += cnt;
+ }
+ catch (Exception e) {
+ if (ex == null)
+ ex = new IgniteException("Failed to put entry");
+
+ ex.addSuppressed(e);
}
}
}
+ if (ex != null)
+ throw ex;
return putCnt;
}
@@ -318,20 +511,21 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
* Prevents cache from performing any operation if only nodes from single data center are left in topology.
*/
private static class SplitAwareTopologyValidator implements TopologyValidator {
+
/** */
private static final long serialVersionUID = 0L;
/** */
@CacheNameResource
- private String cacheName;
+ private transient String cacheName;
/** */
@IgniteInstanceResource
- private Ignite ignite;
+ private transient Ignite ignite;
/** */
@LoggerResource
- private IgniteLogger log;
+ private transient IgniteLogger log;
/** State. */
private transient State state;
@@ -340,12 +534,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
@Override public boolean validate(Collection<ClusterNode> nodes) {
initIfNeeded(nodes);
- if (!F.view(nodes, new IgnitePredicate<ClusterNode>() {
+ for (ClusterNode node : F.view(nodes, new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return !node.isClient() && node.attribute(DC_NODE_ATTR) == null;
}
- }).isEmpty()) {
- log.error("No valid server nodes are detected in topology: [cacheName=" + cacheName + ']');
+ })) {
+ log.error("Not valid server nodes are detected in topology: [cacheName=" + cacheName + ", node=" +
+ node + ']');
return false;
}
@@ -353,7 +548,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
boolean segmented = segmented(nodes);
if (!segmented)
- state = State.VALID; // Also clears possible REPAIRED state.
+ state = State.VALID; // Also clears possible BEFORE_REPAIRED and REPAIRED states.
else {
if (state == State.REPAIRED) // Any topology change in segmented grid in repaired mode is valid.
return true;
@@ -361,23 +556,40 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
// Find discovery event node.
ClusterNode evtNode = evtNode(nodes);
- if (activator(evtNode)) {
- if (log.isInfoEnabled())
- log.info("Grid segmentation is repaired: [cacheName=" + cacheName + ']');
-
- state = State.REPAIRED;
- }
+ if (activator(evtNode))
+ state = State.BEFORE_REPARED;
else {
- if (state == State.VALID) {
- if (log.isInfoEnabled())
- log.info("Grid segmentation is detected: [cacheName=" + cacheName + ']');
+ if (state == State.BEFORE_REPARED) {
+ boolean activatorLeft = true;
+
+ // Check if activator is no longer in topology.
+ for (ClusterNode node : nodes) {
+ if (node.isClient() && activator(node)) {
+ activatorLeft = false;
+
+ break;
+ }
+ }
+
+ if (activatorLeft) {
+ if (log.isInfoEnabled())
+ log.info("Grid segmentation is repaired: [cacheName=" + cacheName + ']');
+
+ state = State.REPAIRED; // Switch to REPAIRED state only when activator leaves.
+ } // Else stay in BEFORE_REPARED state.
}
+ else {
+ if (state == State.VALID) {
+ if (log.isInfoEnabled())
+ log.info("Grid segmentation is detected: [cacheName=" + cacheName + ']');
+ }
- state = State.NOTVALID;
+ state = State.NOTVALID;
+ }
}
}
- return state != State.NOTVALID;
+ return state == State.VALID || state == State.REPAIRED;
}
/** */
@@ -418,7 +630,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
// Search for activator node in history on start.
long topVer = evtNode(nodes).order();
- while(topVer > 0) {
+ while (topVer > 0) {
Collection<ClusterNode> top = ignite.cluster().topology(topVer--);
// Stop on reaching history limit.
@@ -460,11 +672,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/** States. */
private enum State {
- /** Topology valid. */
+ /** Topology is valid. */
VALID,
- /** Topology not valid */
+ /** Topology is not valid */
NOTVALID,
- /** Topology repaired (valid) */
+ /** Before topology will be repaired (valid) */
+ BEFORE_REPARED,
+ /** Topology is repaired (valid) */
REPAIRED;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
new file mode 100644
index 0000000..196681d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Abstract class for tests over split in two half topology.
+ */
+public abstract class IgniteCacheTopologySplitAbstractTest extends GridCommonAbstractTest {
+
+ /** Segmentation state. */
+ private volatile boolean segmented;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setFailureDetectionTimeout(3_000L);
+
+ cfg.setDiscoverySpi(new SplitTcpDiscoverySpi());
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * Trigger segmentation and wait for results. Should be called on stable topology.
+ *
+ * @throws InterruptedException If interrupted while waiting.
+ * @throws IgniteCheckedException On error.
+ */
+ protected void splitAndWait() throws InterruptedException, IgniteCheckedException {
+ if (log.isInfoEnabled())
+ log.info(">>> Simulating split");
+
+ long topVer = grid(0).cluster().topologyVersion();
+
+ // Trigger segmentation.
+ segmented = true;
+
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi)
+ ignite.configuration().getCommunicationSpi();
+
+ comm.blockMessages(new SegmentBlocker(ignite.cluster().localNode()));
+ }
+
+ Collection<Ignite> seg0 = F.view(G.allGrids(), new IgnitePredicate<Ignite>() {
+ @Override public boolean apply(Ignite ignite) {
+ return segment(ignite.cluster().localNode()) == 0;
+ }
+ });
+
+ Collection<Ignite> seg1 = F.view(G.allGrids(), new IgnitePredicate<Ignite>() {
+ @Override public boolean apply(Ignite ignite) {
+ return segment(ignite.cluster().localNode()) == 1;
+ }
+ });
+
+ for (Ignite grid : seg0)
+ ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg1.size()).get();
+
+ for (Ignite grid : seg1)
+ ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg0.size()).get();
+
+ // awaitPartitionMapExchange won't work because coordinator is wrong for second segment.
+ for (Ignite grid : G.allGrids())
+ ((IgniteKernal)grid).context().cache().context().exchange().lastTopologyFuture().get();
+
+ if (log.isInfoEnabled())
+ log.info(">>> Finished waiting for split");
+ }
+
+ /**
+ * Restore initial state
+ */
+ protected void unsplit() {
+ if (log.isInfoEnabled())
+ log.info(">>> Restoring from split");
+
+ segmented = false;
+
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi)
+ ignite.configuration().getCommunicationSpi();
+
+ comm.stopBlock();
+ }
+ }
+
+ /**
+ * @return Segmented status.
+ */
+ protected boolean segmented() {
+ return segmented;
+ }
+
+ /**
+ * Defines split matrix.
+ *
+ * @param locPort Local port.
+ * @param rmtPort Rmt port.
+ * @return {@code true} is link is blocked.
+ */
+ protected abstract boolean isBlocked(int locPort, int rmtPort);
+
+ /**
+ * Defines instance segment: 0 or 1.
+ *
+ * @param node Node.
+ * @return Index of instance segment.
+ */
+ protected abstract int segment(ClusterNode node);
+
+ /**
+ * Discovery SPI which can simulate network split.
+ */
+ protected class SplitTcpDiscoverySpi extends TcpDiscoverySpi {
+ /**
+ * @param sockAddr Remote socket address.
+ * @return Segmented status.
+ */
+ protected boolean segmented(InetSocketAddress sockAddr) {
+ if (!segmented)
+ return false;
+
+ int rmtPort = sockAddr.getPort();
+
+ boolean b = isBlocked(getLocalPort(), rmtPort);
+
+ if (b && log.isDebugEnabled())
+ log.debug("Block cross-segment communication [locPort=" + getLocalPort() + ", rmtPort=" + rmtPort + ']');
+
+ return b;
+ }
+
+ /**
+ * @param sockAddr Socket address.
+ * @param timeout Socket timeout.
+ * @throws SocketTimeoutException If segmented.
+ */
+ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws SocketTimeoutException {
+ if (segmented(sockAddr)) {
+ if (timeout > 0) {
+ try {
+ Thread.sleep(timeout);
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+
+ throw new SocketTimeoutException("Fake socket timeout.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+ checkSegmented(remAddr, timeoutHelper.nextTimeoutChunk(getSocketTimeout()));
+
+ return super.openSocket(sock, remAddr, timeoutHelper);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(
+ Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ byte[] data,
+ long timeout
+ ) throws IOException {
+ checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(
+ Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout
+ ) throws IOException, IgniteCheckedException {
+ checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+ long timeout) throws IOException {
+ checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+ }
+
+ /** */
+ protected class SegmentBlocker implements IgniteBiPredicate<ClusterNode, Message> {
+ /** */
+ private final ClusterNode locNode;
+
+ /**
+ * @param locNode Local node.
+ */
+ SegmentBlocker(ClusterNode locNode) {
+ assert locNode != null;
+
+ this.locNode = locNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node, Message message) {
+ return segment(locNode) != segment(node);
+ }
+ }
+}
\ No newline at end of file