You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/02 12:58:59 UTC
[45/46] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0f7c32c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0f7c32c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0f7c32c
Branch: refs/heads/ignite-1093-2
Commit: f0f7c32caa1a9566c77d0a66bb533a4a07a2338a
Parents: 9abfc60
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 29 18:20:01 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 29 18:20:01 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 196 +++++++++++++------
.../GridCacheRebalancingSyncSelfTest.java | 30 ++-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 17 --
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 46 +++++
.../testframework/junits/GridAbstractTest.java | 3 +-
6 files changed, 194 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 5d4db40..d1d475c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -818,7 +818,7 @@ public class GridDhtPartitionDemander {
@Override public void apply(IgniteInternalFuture<Long> future) {
SyncFuture.this.cancel();
}
- });
+ }); // todo: is it necessary?
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 81e2fa4..b5bb25d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,15 +40,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T4;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
@@ -71,8 +67,8 @@ class GridDhtPartitionSupplier {
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
- /** Supply context map. T4: nodeId, idx, topologyVersion, updateSequence. */
- private final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> scMap =
+ /** Supply context map. T2: nodeId, idx. */
+ private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap =
new ConcurrentHashMap8<>();
/** Rebalancing listener. */
@@ -100,26 +96,18 @@ class GridDhtPartitionSupplier {
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
if (evt instanceof DiscoveryEvent) {
- for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) {
- T4<UUID, Integer, AffinityTopologyVersion, Long> t = entry.getKey();
+ for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
+ T2<UUID, Integer> t = entry.getKey();
- SupplyContext sc = entry.getValue();
+ if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
+ SupplyContext sctx = entry.getValue();
- if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
- clearContext(scMap, t, sc, log);
- }
- }
- else if (evt instanceof CacheRebalancingEvent) {
- CacheRebalancingEvent e = (CacheRebalancingEvent)evt;
+ clearContext(sctx, log);
- if (cctx.name().equals(e.cacheName())) {
- UUID id = e.discoveryNode().id();
+ U.log(log, "Supply context removed for failed node [node=" + t.get1() + "]");
- for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) {
- if (id.equals(entry.getKey().get1()))
- clearContext(scMap, entry.getKey(), entry.getValue(), log);
+ scMap.remove(t, sctx);
}
-
}
}
else {
@@ -128,7 +116,7 @@ class GridDhtPartitionSupplier {
}
};
- cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+ cctx.events().addListener(lsnr, EVT_NODE_FAILED);
startOldListeners();
}
@@ -145,32 +133,38 @@ class GridDhtPartitionSupplier {
/**
* Clear context.
*
- * @param map Context map.
- * @param t id.
* @param sc Supply context.
* @param log Logger.
* @return true in case context was removed.
*/
- private static boolean clearContext(
- final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> map,
- final T4<UUID, Integer, AffinityTopologyVersion, Long> t,
+ private static void clearContext(
final SupplyContext sc,
final IgniteLogger log) {
- final Iterator it = sc.entryIt;
+ if (sc != null) {
+ final Iterator it = sc.entryIt;
- if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
- try {
- synchronized (it) {
- if (!((GridCloseableIterator)it).isClosed())
- ((GridCloseableIterator)it).close();
+ if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
+ try {
+ synchronized (it) {
+ if (!((GridCloseableIterator)it).isClosed())
+ ((GridCloseableIterator)it).close();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Iterator close failed.", e);
}
}
- catch (IgniteCheckedException e) {
- log.error("Iterator close failed.", e);
+
+ final GridDhtLocalPartition loc = sc.loc;
+
+ if (loc != null && loc.reservations() > 0) {
+ synchronized (loc) {
+ if (loc.reservations() > 0)
+ loc.release();
+ }
+
}
}
-
- return map.remove(t, sc);
}
/**
@@ -199,10 +193,16 @@ class GridDhtPartitionSupplier {
ClusterNode node = cctx.discovery().node(id);
- T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence());
+ T2<UUID, Integer> scId = new T2<>(id, idx);
try {
- SupplyContext sctx = scMap.get(scId);
+ SupplyContext sctx = scMap.remove(scId);
+
+ if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) {
+ clearContext(sctx, log);
+
+ sctx = null;
+ }
if (sctx == null && d.partitions() == null)
return;
@@ -230,18 +230,27 @@ class GridDhtPartitionSupplier {
newReq = false;
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+ GridDhtLocalPartition loc;
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
+ if (sctx != null && sctx.loc != null) {
+ loc = sctx.loc;
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + id + ']');
+ assert loc.reservations() > 0;
+ }
+ else {
+ loc = top.localPartition(part, d.topologyVersion(), false);
- continue;
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
}
GridCacheEntryInfoCollectSwapListener swapLsnr = null;
@@ -279,9 +288,18 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ entIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
swapLsnr = null;
+ loc = null;
reply(node, d, s);
@@ -323,7 +341,10 @@ class GridDhtPartitionSupplier {
partIt,
null,
swapLsnr,
- part);
+ part,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
}
}
@@ -354,9 +375,18 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ iter,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
swapLsnr = null;
+ loc = null;
reply(node, d, s);
@@ -437,7 +467,10 @@ class GridDhtPartitionSupplier {
partIt,
null,
null,
- part);
+ part,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
}
}
@@ -465,9 +498,17 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
-
- swapLsnr = null;
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ lsnrIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ loc = null;
reply(node, d, s);
@@ -500,7 +541,8 @@ class GridDhtPartitionSupplier {
sctx = null;
}
finally {
- loc.release();
+ if (loc != null)
+ loc.release();
if (swapLsnr != null) {
cctx.swap().removeOffHeapListener(part, swapLsnr);
@@ -561,12 +603,24 @@ class GridDhtPartitionSupplier {
* @param swapLsnr Swap listener.
*/
private void saveSupplyContext(
- T4 t,
+ T2<UUID, Integer> t,
int phase,
Iterator<Integer> partIt,
int part,
- Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
- scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+ GridDhtLocalPartition loc,
+ AffinityTopologyVersion topVer,
+ long updateSeq) {
+ SupplyContext old = scMap.putIfAbsent(t, new SupplyContext(phase,
+ partIt,
+ entryIt,
+ swapLsnr,
+ part,
+ loc,
+ topVer,
+ updateSeq));
+
+ assert old == null;
}
/**
@@ -588,6 +642,15 @@ class GridDhtPartitionSupplier {
/** Partition. */
private final int part;
+ /** Local partition. */
+ GridDhtLocalPartition loc;
+
+ /** Topology version. */
+ AffinityTopologyVersion topVer;
+
+ /** Update seq. */
+ long updateSeq;
+
/**
* @param phase Phase.
* @param partIt Partition iterator.
@@ -595,13 +658,22 @@ class GridDhtPartitionSupplier {
* @param swapLsnr Swap listener.
* @param part Partition.
*/
- public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
- GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+ public SupplyContext(int phase,
+ Iterator<Integer> partIt,
+ Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr,
+ int part,
+ GridDhtLocalPartition loc,
+ AffinityTopologyVersion topVer,
+ long updateSeq) {
this.phase = phase;
this.partIt = partIt;
this.entryIt = entryIt;
this.swapLsnr = swapLsnr;
this.part = part;
+ this.loc = loc;
+ this.topVer = topVer;
+ this.updateSeq = updateSeq;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index be8e24b..bb40f31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,18 +60,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
/** */
- private volatile boolean concurrentStartFinished = false;
+ private volatile boolean concurrentStartFinished;
/** */
- private volatile boolean concurrentStartFinished2 = false;
-
- private volatile FailableTcpDiscoverySpi spi;
-
- public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
- public void fail() {
- simulateNodeFailure();
- }
- }
+ private volatile boolean concurrentStartFinished2;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -78,11 +71,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
iCfg.setRebalanceThreadPoolSize(4);
- iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
-
- if (getTestGridName(20).equals(gridName))
- spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
-
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
@@ -96,7 +84,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cachePCfg.setBackups(1);
cachePCfg.setRebalanceBatchSize(1);
- cachePCfg.setRebalanceBatchesCount(1);
+ //cachePCfg.setRebalanceBatchesCount(1);
+ cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
@@ -285,6 +274,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long start = System.currentTimeMillis();
+ concurrentStartFinished = false;
+ concurrentStartFinished2 = false;
+
Thread t1 = new Thread() {
@Override public void run() {
try {
@@ -409,11 +401,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(1, 2);
- startGrid(20);
+ startGrid(2);
- waitForRebalancing(20, 3);
+ waitForRebalancing(2, 3);
- spi.fail();
+ ((TestTcpDiscoverySpi)grid(2).configuration().getDiscoverySpi()).simulateNodeFailure();
waitForRebalancing(0, 4);
waitForRebalancing(1, 4);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
- *
- */
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
- /** */
- private boolean ignorePingResponse;
-
- /** {@inheritDoc} */
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
- IgniteCheckedException {
- if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
- return;
- else
- super.writeToSocket(sock, msg, timeout);
- }
- }
-
- /**
* @throws Exception If any error occurs.
*/
public void testNodeAdded() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ public boolean ignorePingResponse;
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+ return;
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void simulateNodeFailure() {
+ super.simulateNodeFailure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f54fe06..546549b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -76,6 +76,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1227,7 +1228,7 @@ public abstract class GridAbstractTest extends TestCase {
cfg.setCommunicationSpi(commSpi);
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
if (isDebug()) {
discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);