You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/08 16:53:51 UTC
[50/50] incubator-ignite git commit: # ignite-929
# ignite-929
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b6c7eaee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b6c7eaee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b6c7eaee
Branch: refs/heads/ignite-929
Commit: b6c7eaee3f7f637e75db8fa9ce43822b7bff93bb
Parents: 5e5be0c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 8 17:15:04 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 8 17:50:38 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 6 +--
.../processors/cache/GridCacheProcessor.java | 12 +++--
.../GridDhtPartitionsExchangeFuture.java | 9 +++-
.../affinity/IgniteClientNodeAffinityTest.java | 16 ++++--
...cheStoreSessionListenerAbstractSelfTest.java | 55 ++++++++++++--------
.../GridCacheTxLoadFromStoreOnLockSelfTest.java | 1 +
.../cache/CacheOffheapMapEntrySelfTest.java | 21 ++++----
.../cache/CacheStopAndDestroySelfTest.java | 23 ++++----
.../cache/IgniteDynamicCacheStartSelfTest.java | 28 ++++++----
...teCacheClientNodePartitionsExchangeTest.java | 5 ++
.../DataStreamerMultinodeCreateCacheTest.java | 1 +
11 files changed, 115 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index ba996d4..1090fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -546,14 +546,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* Closes cache.
* For local cache equivalent to {@link #destroy()}.
* For distributed caches, if called on clients, closes client cache, if called on a server node,
- * do nothing.
+ * does nothing.
*/
- @Override void close();
+ @Override public void close();
/**
* Completely deletes the cache with all its data from the system on all cluster nodes.
*/
- void destroy();
+ public void destroy();
/**
* This cache node to re-balance its partitions. This method is usually used when
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 88a7dcf..e494cd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1394,6 +1394,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc != null) {
+ if (req.close()) {
+ assert req.initiatingNodeId() != null : req;
+
+ return true;
+ }
+
if (desc.deploymentId().equals(req.deploymentId())) {
if (req.start())
return !desc.cancelled();
@@ -1526,7 +1532,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
- assert req.stop() || req.close();
+ assert req.stop() || req.close() : req;
// Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1539,7 +1545,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void prepareCacheStop(DynamicCacheChangeRequest req) {
- assert req.stop() || req.close();
+ assert req.stop() || req.close() : req;
GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
@@ -2230,8 +2236,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepareCacheStop(req);
}
- // Renew deployment id to have no race condition with start after stop.
- desc.deploymentId(IgniteUuid.randomUuid());
ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a3b870c..bae55ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -510,7 +510,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
ClusterNode node = discoEvt.eventNode();
// Client need to initialize affinity for local join event or for stated client caches.
- if (!node.isLocal()) {
+ if (!node.isLocal() || clientCacheClose()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -840,6 +840,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return {@code True} if exchange initiated for client cache close.
+ */
+ private boolean clientCacheClose() {
+ return reqs != null && reqs.size() == 1 && reqs.iterator().next().close();
+ }
+
+ /**
*
*/
private void dumpPendingObjects() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
index 4244cae..da27fb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -127,17 +127,23 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
ccfg.setNodeFilter(new TestNodesFilter());
- IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+ IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+ try {
checkCache(null, 1);
-
- cache.destroy();
+ }
+ finally {
+ cache.destroy();
+ }
cache = client.createCache(ccfg, new NearCacheConfiguration());
+ try {
checkCache(null, 1);
-
- cache.destroy();
+ }
+ finally {
+ cache.destroy();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
index 2b0d270..19b8dc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -114,12 +114,16 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC);
IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
- cache.loadCache(null);
- cache.get(1);
- cache.put(1, 1);
- cache.remove(1);
- cache.destroy();
+ try {
+ cache.loadCache(null);
+ cache.get(1);
+ cache.put(1, 1);
+ cache.remove(1);
+ }
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -135,12 +139,16 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
- cache.loadCache(null);
- cache.get(1);
- cache.put(1, 1);
- cache.remove(1);
- cache.destroy();
+ try {
+ cache.loadCache(null);
+ cache.get(1);
+ cache.put(1, 1);
+ cache.remove(1);
+ }
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -157,6 +165,7 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
+
try (Transaction tx = ignite(0).transactions().txStart()) {
cache.put(1, 1);
cache.put(2, 2);
@@ -165,8 +174,9 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
tx.commit();
}
-
- cache.destroy();
+ finally {
+ cache.destroy();
+ }
assertEquals(2, writeCnt.get());
assertEquals(2, deleteCnt.get());
@@ -191,9 +201,10 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
tx.commit();
}
-
- cache1.destroy();
- cache2.destroy();
+ finally {
+ cache1.destroy();
+ cache2.destroy();
+ }
assertEquals(2, writeCnt.get());
assertEquals(2, deleteCnt.get());
@@ -218,9 +229,10 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
tx.commit();
}
-
- cache1.destroy();
- cache2.destroy();
+ finally {
+ cache1.destroy();
+ cache2.destroy();
+ }
try (Connection conn = DriverManager.getConnection(URL)) {
checkTable(conn, 1, false);
@@ -256,9 +268,10 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
assertEquals("Expected failure.", we.getMessage());
}
-
- cache1.destroy();
- cache2.destroy();
+ finally {
+ cache1.destroy();
+ cache2.destroy();
+ }
try (Connection conn = DriverManager.getConnection(URL)) {
checkTable(conn, 1, true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
index eae07f9..bc6b443 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
@@ -117,6 +117,7 @@ public class GridCacheTxLoadFromStoreOnLockSelfTest extends GridCommonAbstractTe
assertEquals(0, cache.size());
}
}
+
cache.destroy();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
index 7630582..f4d7607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
@@ -151,20 +151,23 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
IgniteCache jcache = grid(0).getOrCreateCache(cfg);
- GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
+ try {
+ GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
- Integer key = primaryKey(grid(0).cache(null));
+ Integer key = primaryKey(grid(0).cache(null));
- cache.put(key, "val");
+ cache.put(key, "val");
- GridCacheEntryEx entry = cache.entryEx(key);
+ GridCacheEntryEx entry = cache.entryEx(key);
- entry.unswap(true);
+ entry.unswap(true);
- assertNotNull(entry);
+ assertNotNull(entry);
- assertEquals(entry.getClass(), entryCls);
-
- jcache.destroy();
+ assertEquals(entry.getClass(), entryCls);
+ }
+ finally {
+ jcache.destroy();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
index 79aa563..536ddc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
@@ -121,9 +121,11 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
private CacheConfiguration getDhtConfig() {
CacheConfiguration cfg = defaultCacheConfiguration();
+
cfg.setName(CACHE_NAME_DHT);
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setNearConfiguration(null);
+
return cfg;
}
@@ -132,9 +134,11 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
private CacheConfiguration getClientConfig() {
CacheConfiguration cfg = defaultCacheConfiguration();
+
cfg.setName(CACHE_NAME_CLIENT);
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setNearConfiguration(null);
+
return cfg;
}
@@ -143,9 +147,11 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
private CacheConfiguration getNearConfig() {
CacheConfiguration cfg = defaultCacheConfiguration();
+
cfg.setName(CACHE_NAME_NEAR);
cfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setNearConfiguration(new NearCacheConfiguration());
+
return cfg;
}
@@ -154,9 +160,11 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
private CacheConfiguration getLocalConfig() {
CacheConfiguration cfg = defaultCacheConfiguration();
+
cfg.setName(CACHE_NAME_LOC);
cfg.setCacheMode(CacheMode.LOCAL);
cfg.setNearConfiguration(null);
+
return cfg;
}
@@ -535,6 +543,8 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
assert cache1.get(KEY_VAL).equals(curVal);
assert cache2.get(KEY_VAL).equals(curVal);
}
+
+ awaitPartitionMapExchange();
}
}
@@ -730,10 +740,9 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
/**
* Tests concurrent close.
*
- * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException
- * @throws InterruptedException
+ * @throws Exception If failed.
*/
- public void testConcurrentCloseSetWithTry() throws IgniteInterruptedCheckedException, InterruptedException {
+ public void _testConcurrentCloseSetWithTry() throws Exception {
final AtomicInteger a1 = new AtomicInteger();
final AtomicInteger a2 = new AtomicInteger();
final AtomicInteger a3 = new AtomicInteger();
@@ -742,25 +751,21 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
closeWithTry(a1, 0);
-
}
});
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
closeWithTry(a2, 0);
-
}
});
Thread t3 = new Thread(new Runnable() {
@Override public void run() {
closeWithTry(a3, 2);
-
}
});
Thread t4 = new Thread(new Runnable() {
@Override public void run() {
closeWithTry(a4, 2);
-
}
});
@@ -825,7 +830,8 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
mgr.destroyCache(cacheName);
- Cache cache = mgr.createCache(cacheName, new MutableConfiguration<Integer, String>().setTypes(Integer.class, String.class));
+ Cache cache = mgr.createCache(cacheName,
+ new MutableConfiguration<Integer, String>().setTypes(Integer.class, String.class));
cache.close();
@@ -837,7 +843,6 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
catch (IllegalStateException e) {
// No-op;
}
-
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index ba3adb8..0143be4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -648,10 +648,13 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
IgniteCache<Object, Object> cache = ignite(0).createCache(cfg);
- for (CountDownLatch start : starts)
- start.await();
-
- cache.destroy();
+ try {
+ for (CountDownLatch start : starts)
+ start.await();
+ }
+ finally {
+ cache.destroy();
+ }
for (CountDownLatch stop : stops)
stop.await();
@@ -696,6 +699,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
finally {
cache.destroy();
+
stopGrid(nodeCount() + 1);
}
}
@@ -1019,14 +1023,18 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg = new CacheConfiguration(DYNAMIC_CACHE_NAME);
IgniteCache cache = ignite(0).createCache(cfg);
- for (int i = 0; i < 100; i++) {
- assertFalse(ignite(0).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i)
- .contains(dNode.cluster().localNode()));
- cache.put(i, i);
- }
+ try {
+ for (int i = 0; i < 100; i++) {
+ assertFalse(ignite(0).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i)
+ .contains(dNode.cluster().localNode()));
- cache.destroy();
+ cache.put(i, i);
+ }
+ }
+ finally {
+ cache.destroy();
+ }
}
finally {
stopGrid(nodeCount());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index 5f352e8..801a75b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
@@ -536,10 +537,14 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
AffinityTopologyVersion topVer;
if (!srvNode) {
+ log.info("Close client cache: " + CACHE_NAME1);
+
ignite2.cache(CACHE_NAME1).close();
assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache(CACHE_NAME1));
+ waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2));
+
assertEquals(0, spi0.partitionsSingleMessages());
assertEquals(0, spi0.partitionsFullMessages());
assertEquals(0, spi1.partitionsSingleMessages());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6c7eaee/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
index 52f3cb7..470ac79 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
@@ -77,6 +77,7 @@ public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest
String cacheName = "cache-" + threadIdx + "-" + (iter % 10);
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheName);
+
try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(cacheName)) {
((DataStreamerImpl<Object, Object>)stmr).maxRemapCount(0);