You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/04/28 16:08:23 UTC
ignite git commit: IGNITE-7918 Huge memory leak when data streamer
used together with local cache. - Fixes #3778.
Repository: ignite
Updated Branches:
refs/heads/master 5135f82f3 -> 09002f2e0
IGNITE-7918 Huge memory leak when data streamer used together with local cache. - Fixes #3778.
Signed-off-by: dspavlov <dp...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09002f2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09002f2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09002f2e
Branch: refs/heads/master
Commit: 09002f2e05629a1c71443ed5e135ea125f0e7722
Parents: 5135f82
Author: Andrei Aleksandrov <ae...@gmail.com>
Authored: Sat Apr 28 19:07:55 2018 +0300
Committer: dspavlov <dp...@gridgain.com>
Committed: Sat Apr 28 19:07:55 2018 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 58 +++---
.../affinity/GridAffinityProcessor.java | 81 +++++++-
.../affinity/AffinityHistoryCleanupTest.java | 86 ++++----
.../GridAffinityProcessorMemoryLeakTest.java | 202 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
5 files changed, 346 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index e420977..34e2b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -122,9 +122,6 @@ public class GridAffinityAssignmentCache {
/** Node stop flag. */
private volatile IgniteCheckedException stopErr;
- /** History size ignoring client events changes. */
- private final AtomicInteger histSize = new AtomicInteger();
-
/** Full history size. */
private final AtomicInteger fullHistSize = new AtomicInteger();
@@ -206,11 +203,13 @@ public class GridAffinityAssignmentCache {
*/
public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
+
assert idealAssignment != null;
GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
- affCache.put(topVer, new HistoryAffinityAssignment(assignment));
+ HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment));
+
head.set(assignment);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -223,7 +222,9 @@ public class GridAffinityAssignmentCache {
}
}
- onHistoryAdded(assignment);
+ // In case if value was replaced there is no sense to clean the history.
+ if (hAff == null)
+ onHistoryAdded();
if (log.isTraceEnabled()) {
log.trace("New affinity assignment [grp=" + cacheOrGrpName
@@ -273,6 +274,8 @@ public class GridAffinityAssignmentCache {
affCache.clear();
+ fullHistSize.set(0);
+
head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
stopErr = null;
@@ -484,12 +487,14 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment aff = head.get();
- assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
- assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
+
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
- affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
+ HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
+
head.set(assignmentCpy);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -502,7 +507,9 @@ public class GridAffinityAssignmentCache {
}
}
- onHistoryAdded(assignmentCpy);
+ // In case if value was replaced there is no sense to clean the history.
+ if (hAff == null)
+ onHistoryAdded();
}
/**
@@ -779,27 +786,15 @@ public class GridAffinityAssignmentCache {
}
/**
- * @param aff Added affinity assignment.
+ * Cleaning the affinity history.
*/
- private void onHistoryAdded(GridAffinityAssignment aff) {
- int fullSize = fullHistSize.incrementAndGet();
-
- int size;
-
- if (aff.clientEventChange())
- size = histSize.get();
- else
- size = histSize.incrementAndGet();
-
- int rmvCnt = size - MAX_HIST_SIZE;
+ private void onHistoryAdded() {
+ if (fullHistSize.incrementAndGet() > MAX_HIST_SIZE) {
+ Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
- if (rmvCnt <= 0) {
- if (fullSize > MAX_HIST_SIZE * 2)
- rmvCnt = MAX_HIST_SIZE;
- }
+ int rmvCnt = MAX_HIST_SIZE / 2;
- if (rmvCnt > 0) {
- Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
+ AffinityTopologyVersion topVerRmv = null;
while (it.hasNext() && rmvCnt > 0) {
AffinityAssignment aff0 = it.next();
@@ -808,11 +803,14 @@ public class GridAffinityAssignmentCache {
rmvCnt--;
- if (!aff0.clientEventChange())
- histSize.decrementAndGet();
-
fullHistSize.decrementAndGet();
+
+ topVerRmv = aff0.topologyVersion();
}
+
+ topVerRmv = it.hasNext() ? it.next().topologyVersion() : topVerRmv;
+
+ ctx.affinity().removeCachedAffinity(topVerRmv);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 128eaf0..e26c0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -26,9 +26,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -63,7 +64,6 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -86,8 +86,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
/** Time to wait between errors (in milliseconds). */
private static final long ERROR_WAIT = 500;
+ /** Log. */
+ private final IgniteLogger log;
+
/** Affinity map. */
- private final ConcurrentMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentHashMap<>();
+ private final ConcurrentSkipListMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentSkipListMap<>();
/** Listener. */
private final GridLocalEventListener lsnr = new GridLocalEventListener() {
@@ -131,6 +134,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
*/
public GridAffinityProcessor(GridKernalContext ctx) {
super(ctx);
+
+ log = ctx.log(GridAffinityProcessor.class);
}
/** {@inheritDoc} */
@@ -212,6 +217,34 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null;
}
+ /**
+ * Removes cached affinity instances with affinity topology versions less than {@code topVer}.
+ *
+ * @param topVer topology version.
+ */
+ public void removeCachedAffinity(AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ int oldSize = affMap.size();
+
+ Iterator<Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>>> it =
+ affMap.headMap(new AffinityAssignmentKey(topVer)).entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> entry = it.next();
+
+ assert entry.getValue() != null;
+
+ if (!entry.getValue().isDone())
+ continue;
+
+ it.remove();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Affinity cached values were cleared: " + (oldSize - affMap.size()));
+ }
+
/**
* Maps keys to nodes for given cache.
@@ -358,6 +391,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
@SuppressWarnings("ErrorNotRethrown")
@Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer)
throws IgniteCheckedException {
+
+ assert cacheName != null;
+
AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
@@ -658,7 +694,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class AffinityAssignmentKey {
+ private static class AffinityAssignmentKey implements Comparable<AffinityAssignmentKey> {
/** */
private String cacheName;
@@ -669,11 +705,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
* @param cacheName Cache name.
* @param topVer Topology version.
*/
- private AffinityAssignmentKey(String cacheName, @NotNull AffinityTopologyVersion topVer) {
+ private AffinityAssignmentKey(@NotNull String cacheName, @NotNull AffinityTopologyVersion topVer) {
this.cacheName = cacheName;
this.topVer = topVer;
}
+ /**
+ * Current constructor should be used only in removeCachedAffinity for creating of the special keys for removing.
+ *
+ * @param topVer Topology version.
+ */
+ private AffinityAssignmentKey(@NotNull AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
@@ -700,6 +745,32 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
@Override public String toString() {
return S.toString(AffinityAssignmentKey.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(AffinityAssignmentKey o) {
+ assert o != null;
+
+ if (this == o)
+ return 0;
+
+ int res = this.topVer.compareTo(o.topVer);
+
+ // Key with null cache name must be less than any key with not null cache name for the same topVer.
+ if (res == 0) {
+ if (cacheName == null && o.cacheName != null)
+ return -1;
+
+ if (cacheName != null && o.cacheName == null)
+ return 1;
+
+ if (cacheName == null && o.cacheName == null)
+ return 0;
+
+ return cacheName.compareTo(o.cacheName);
+ }
+
+ return res;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
index 605cc5f..f89d9ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
@@ -89,34 +89,34 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
Ignite ignite = startGrid(0);
- checkHistory(ignite, F.asList(topVer(1, 0)), 1);
+ checkHistory(ignite, F.asList(topVer(1, 0)), 1); //fullHistSize = 1
startGrid(1);
checkHistory(ignite, F.asList(
- topVer(1, 0),
- topVer(2, 0),
- topVer(2, 1)),
+ topVer(1, 0), // FullHistSize = 1.
+ topVer(2, 0), // FullHistSize = 2.
+ topVer(2, 1)), // FullHistSize = 3.
3);
startGrid(2);
checkHistory(ignite, F.asList(
- topVer(1, 0),
- topVer(2, 0),
- topVer(2, 1),
- topVer(3, 0),
- topVer(3, 1)),
+ topVer(1, 0), // FullHistSize = 1.
+ topVer(2, 0), // FullHistSize = 2.
+ topVer(2, 1), // FullHistSize = 3.
+ topVer(3, 0), // FullHistSize = 4.
+ topVer(3, 1)), // FullHistSize = 5.
5);
startGrid(3);
checkHistory(ignite, F.asList(
- topVer(2, 1),
- topVer(3, 0),
- topVer(3, 1),
- topVer(4, 0),
- topVer(4, 1)),
+ topVer(2, 1), // FullHistSize = 3.
+ topVer(3, 0), // FullHistSize = 4.
+ topVer(3, 1), // FullHistSize = 5.
+ topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(4, 1)), // FullHistSize = 5.
5);
client = true;
@@ -126,13 +126,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
- topVer(2, 1),
- topVer(3, 0),
- topVer(3, 1),
- topVer(4, 0),
- topVer(4, 1),
- topVer(5, 0),
- topVer(6, 0)),
+ topVer(3, 1), // FullHistSize = 5.
+ topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(4, 1), // FullHistSize = 5.
+ topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(6, 0)), // FullHistSize = 5.
5);
startGrid(4);
@@ -140,15 +138,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
- topVer(2, 1),
- topVer(3, 0),
- topVer(3, 1),
- topVer(4, 0),
- topVer(4, 1),
- topVer(5, 0),
- topVer(6, 0),
- topVer(7, 0),
- topVer(8, 0)),
+ topVer(4, 1), // FullHistSize = 5.
+ topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(6, 0), // FullHistSize = 5.
+ topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(8, 0)), // FullHistSize = 5.
5);
startGrid(4);
@@ -156,28 +150,24 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
- topVer(5, 0),
- topVer(6, 0),
- topVer(7, 0),
- topVer(8, 0),
- topVer(9, 0),
- topVer(10, 0)),
- 0);
+ topVer(6, 0), // FullHistSize = 5.
+ topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(8, 0), // FullHistSize = 5.
+ topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(10, 0)), // FullHistSize = 5.
+ 5);
client = false;
startGrid(4);
checkHistory(ignite, F.asList(
- topVer(5, 0),
- topVer(6, 0),
- topVer(7, 0),
- topVer(8, 0),
- topVer(9, 0),
- topVer(10, 0),
- topVer(11, 0),
- topVer(11, 1)),
- 2);
+ topVer(8, 0), // FullHistSize = 5.
+ topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(10, 0), // FullHistSize = 5.
+ topVer(11, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(11, 1)), // FullHistSize = 5.
+ 5);
}
finally {
if (histProp != null)
@@ -203,9 +193,9 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
for (GridCacheContext cctx : proc.context().cacheContexts()) {
GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff");
- AtomicInteger histSize = GridTestUtils.getFieldValue(aff, "histSize");
+ AtomicInteger fullHistSize = GridTestUtils.getFieldValue(aff, "fullHistSize");
- assertEquals(expSize, histSize.get());
+ assertEquals(expSize, fullHistSize.get());
Map<AffinityTopologyVersion, Object> cache = GridTestUtils.getFieldValue(aff, "affCache");
http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
new file mode 100644
index 0000000..3b6857d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ * Tests for {@link GridAffinityProcessor}.
+ */
+@GridCommonTest(group = "Affinity Processor")
+public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest {
+ /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */
+ private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100);
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setForceServerMode(true);
+
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName(CACHE_NAME);
+
+ cacheCfg.setStoreKeepBinary(true);
+
+ cacheCfg.setCacheMode(CacheMode.LOCAL);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test affinity functions caching and clean up.
+ *
+ * @throws Exception In case of any exception.
+ */
+ public void testAffinityProcessor() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ IgniteKernal grid = (IgniteKernal)grid(0);
+
+ IgniteCache<String, String> cache;
+
+ IgniteCache<String, String> globalCache = getOrCreateGlobalCache(ignite);
+
+ IgniteDataStreamer<String, String> globalStreamer;
+
+ int count = MAX_HIST_SIZE * 4;
+
+ int size;
+
+ do {
+ try {
+ cache = createLocalCache(ignite, count);
+
+ cache.put("Key" + count, "Value" + count);
+
+ cache.destroy();
+
+ globalStreamer = createGlobalStreamer(ignite, globalCache);
+
+ globalStreamer.addData("GlobalKey" + count, "GlobalValue" + count);
+
+ globalStreamer.flush();
+
+ globalStreamer.close();
+
+ size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size();
+
+ assertTrue("Cache has size that bigger then expected [size=" + size + "" +
+ ", expLimit=" + MAX_HIST_SIZE * 3 + "]", size < MAX_HIST_SIZE * 3);
+ }
+ catch (Exception e) {
+ fail("Error was handled [" + e.getMessage() + "]");
+ }
+ }
+ while (count-- > 0);
+ }
+
+ /**
+ * Creates global cache.
+ *
+ * @param ignite instance of {@code Ignite}.
+ * @param id unique id for local cache.
+ * @return local cache instance.
+ */
+ private static IgniteCache<String, String> createLocalCache(Ignite ignite, long id) {
+ final String cacheName = "localCache" + id;
+
+ final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
+
+ cCfg.setName(cacheName);
+
+ cCfg.setCacheMode(CacheMode.LOCAL);
+
+ cCfg.setGroupName("some group");
+
+ ignite.destroyCache(cacheName); // Local cache is not really local - reference can be kept by other nodes if restart during the load happens.
+
+ return ignite.createCache(cCfg).withKeepBinary();
+ }
+
+ /**
+ * Gets or creates global cache.
+ *
+ * @param ignite instance of {@code Ignite}.
+ * @return global cache instance.
+ */
+ private static IgniteCache<String, String> getOrCreateGlobalCache(Ignite ignite) {
+ final String cacheName = "GlobalCache";
+
+ final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
+
+ cCfg.setName(cacheName);
+
+ cCfg.setStoreKeepBinary(true);
+
+ cCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ cCfg.setOnheapCacheEnabled(false);
+
+ cCfg.setCopyOnRead(false);
+
+ cCfg.setBackups(0);
+
+ cCfg.setWriteBehindEnabled(false);
+
+ cCfg.setReadThrough(false);
+
+ return ignite.getOrCreateCache(cCfg).withKeepBinary();
+ }
+
+ /**
+ * Creates streamer for global cache.
+ *
+ * @param ignite instance of {@code Ignite}.
+ * @param cache instance of global cache.
+ * @return instance of {@code IgniteDataStreamer}.
+ */
+ private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite,
+ IgniteCache<String, String> cache) {
+ IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName());
+
+ streamer.allowOverwrite(true);
+
+ streamer.skipStore(true);
+
+ streamer.keepBinary(false);
+
+ return streamer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index e71a569..cc93c5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
import org.apache.ignite.internal.MarshallerContextLockingSelfTest;
import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
+import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorMemoryLeakTest;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest;
import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
@@ -133,6 +134,7 @@ public class IgniteBasicTestSuite extends TestSuite {
GridTestUtils.addTestIfNeeded(suite, GridReleaseTypeSelfTest.class, ignoredTests);
suite.addTestSuite(GridProductVersionSelfTest.class);
suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class);
+ suite.addTestSuite(GridAffinityProcessorMemoryLeakTest.class);
suite.addTestSuite(GridClosureProcessorSelfTest.class);
suite.addTestSuite(GridClosureProcessorRemoteTest.class);
suite.addTestSuite(GridClosureSerializationTest.class);