You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/04/28 16:08:23 UTC

ignite git commit: IGNITE-7918 Huge memory leak when data streamer used together with local cache. - Fixes #3778.

Repository: ignite
Updated Branches:
  refs/heads/master 5135f82f3 -> 09002f2e0


IGNITE-7918 Huge memory leak when data streamer used together with local cache. - Fixes #3778.

Signed-off-by: dspavlov <dp...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09002f2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09002f2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09002f2e

Branch: refs/heads/master
Commit: 09002f2e05629a1c71443ed5e135ea125f0e7722
Parents: 5135f82
Author: Andrei Aleksandrov <ae...@gmail.com>
Authored: Sat Apr 28 19:07:55 2018 +0300
Committer: dspavlov <dp...@gridgain.com>
Committed: Sat Apr 28 19:07:55 2018 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  58 +++---
 .../affinity/GridAffinityProcessor.java         |  81 +++++++-
 .../affinity/AffinityHistoryCleanupTest.java    |  86 ++++----
 .../GridAffinityProcessorMemoryLeakTest.java    | 202 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 5 files changed, 346 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index e420977..34e2b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -122,9 +122,6 @@ public class GridAffinityAssignmentCache {
     /** Node stop flag. */
     private volatile IgniteCheckedException stopErr;
 
-    /** History size ignoring client events changes. */
-    private final AtomicInteger histSize = new AtomicInteger();
-
     /** Full history size. */
     private final AtomicInteger fullHistSize = new AtomicInteger();
 
@@ -206,11 +203,13 @@ public class GridAffinityAssignmentCache {
      */
     public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
+
         assert idealAssignment != null;
 
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
 
-        affCache.put(topVer, new HistoryAffinityAssignment(assignment));
+        HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment));
+
         head.set(assignment);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -223,7 +222,9 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        onHistoryAdded(assignment);
+        // In case if value was replaced there is no sense to clean the history.
+        if (hAff == null)
+            onHistoryAdded();
 
         if (log.isTraceEnabled()) {
             log.trace("New affinity assignment [grp=" + cacheOrGrpName
@@ -273,6 +274,8 @@ public class GridAffinityAssignmentCache {
 
         affCache.clear();
 
+        fullHistSize.set(0);
+
         head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
 
         stopErr = null;
@@ -484,12 +487,14 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment aff = head.get();
 
-        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT  || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
-        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT  || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
+        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
+
+        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
 
         GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
 
-        affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
+        HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
+
         head.set(assignmentCpy);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -502,7 +507,9 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        onHistoryAdded(assignmentCpy);
+        // In case if value was replaced there is no sense to clean the history.
+        if (hAff == null)
+            onHistoryAdded();
     }
 
     /**
@@ -779,27 +786,15 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
-     * @param aff Added affinity assignment.
+     * Cleaning the affinity history.
      */
-    private void onHistoryAdded(GridAffinityAssignment aff) {
-        int fullSize = fullHistSize.incrementAndGet();
-
-        int size;
-
-        if (aff.clientEventChange())
-            size = histSize.get();
-        else
-            size = histSize.incrementAndGet();
-
-        int rmvCnt = size - MAX_HIST_SIZE;
+    private void onHistoryAdded() {
+        if (fullHistSize.incrementAndGet() > MAX_HIST_SIZE) {
+            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
 
-        if (rmvCnt <= 0) {
-            if (fullSize > MAX_HIST_SIZE * 2)
-                rmvCnt = MAX_HIST_SIZE;
-        }
+            int rmvCnt = MAX_HIST_SIZE / 2;
 
-        if (rmvCnt > 0) {
-            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
+            AffinityTopologyVersion topVerRmv = null;
 
             while (it.hasNext() && rmvCnt > 0) {
                 AffinityAssignment aff0 = it.next();
@@ -808,11 +803,14 @@ public class GridAffinityAssignmentCache {
 
                 rmvCnt--;
 
-                if (!aff0.clientEventChange())
-                    histSize.decrementAndGet();
-
                 fullHistSize.decrementAndGet();
+
+                topVerRmv = aff0.topologyVersion();
             }
+
+            topVerRmv = it.hasNext() ? it.next().topologyVersion() : topVerRmv;
+
+            ctx.affinity().removeCachedAffinity(topVerRmv);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 128eaf0..e26c0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -26,9 +26,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -63,7 +64,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -86,8 +86,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     /** Time to wait between errors (in milliseconds). */
     private static final long ERROR_WAIT = 500;
 
+    /** Log. */
+    private final IgniteLogger log;
+
     /** Affinity map. */
-    private final ConcurrentMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentHashMap<>();
+    private final ConcurrentSkipListMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentSkipListMap<>();
 
     /** Listener. */
     private final GridLocalEventListener lsnr = new GridLocalEventListener() {
@@ -131,6 +134,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      */
     public GridAffinityProcessor(GridKernalContext ctx) {
         super(ctx);
+
+        log = ctx.log(GridAffinityProcessor.class);
     }
 
     /** {@inheritDoc} */
@@ -212,6 +217,34 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null;
     }
 
+    /**
+     * Removes cached affinity instances with affinity topology versions less than {@code topVer}.
+     *
+     * @param topVer topology version.
+     */
+    public void removeCachedAffinity(AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
+        int oldSize = affMap.size();
+
+        Iterator<Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>>> it =
+            affMap.headMap(new AffinityAssignmentKey(topVer)).entrySet().iterator();
+
+        while (it.hasNext()) {
+            Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> entry = it.next();
+
+            assert entry.getValue() != null;
+
+            if (!entry.getValue().isDone())
+                continue;
+
+            it.remove();
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Affinity cached values were cleared: " + (oldSize - affMap.size()));
+    }
+
 
     /**
      * Maps keys to nodes for given cache.
@@ -358,6 +391,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     @SuppressWarnings("ErrorNotRethrown")
     @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
+
+        assert cacheName != null;
+
         AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
 
         IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
@@ -658,7 +694,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private static class AffinityAssignmentKey {
+    private static class AffinityAssignmentKey implements Comparable<AffinityAssignmentKey> {
         /** */
         private String cacheName;
 
@@ -669,11 +705,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
          * @param cacheName Cache name.
          * @param topVer Topology version.
          */
-        private AffinityAssignmentKey(String cacheName, @NotNull AffinityTopologyVersion topVer) {
+        private AffinityAssignmentKey(@NotNull String cacheName, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
 
+        /**
+         * Current constructor should be used only in removeCachedAffinity for creating of the special keys for removing.
+         *
+         * @param topVer Topology version.
+         */
+        private AffinityAssignmentKey(@NotNull AffinityTopologyVersion topVer) {
+            this.topVer = topVer;
+        }
+
         /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)
@@ -700,6 +745,32 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         @Override public String toString() {
             return S.toString(AffinityAssignmentKey.class, this);
         }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(AffinityAssignmentKey o) {
+            assert o != null;
+
+            if (this == o)
+                return 0;
+
+            int res = this.topVer.compareTo(o.topVer);
+
+            // Key with null cache name must be less than any key with not null cache name for the same topVer.
+            if (res == 0) {
+                if (cacheName == null && o.cacheName != null)
+                    return -1;
+
+                if (cacheName != null && o.cacheName == null)
+                    return 1;
+
+                if (cacheName == null && o.cacheName == null)
+                    return 0;
+
+                return cacheName.compareTo(o.cacheName);
+            }
+
+            return res;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
index 605cc5f..f89d9ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
@@ -89,34 +89,34 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
 
             Ignite ignite = startGrid(0);
 
-            checkHistory(ignite, F.asList(topVer(1, 0)), 1);
+            checkHistory(ignite, F.asList(topVer(1, 0)), 1); //fullHistSize = 1
 
             startGrid(1);
 
             checkHistory(ignite, F.asList(
-                topVer(1, 0),
-                topVer(2, 0),
-                topVer(2, 1)),
+                topVer(1, 0), // FullHistSize = 1.
+                topVer(2, 0), // FullHistSize = 2.
+                topVer(2, 1)), // FullHistSize = 3.
                 3);
 
             startGrid(2);
 
             checkHistory(ignite, F.asList(
-                topVer(1, 0),
-                topVer(2, 0),
-                topVer(2, 1),
-                topVer(3, 0),
-                topVer(3, 1)),
+                topVer(1, 0), // FullHistSize = 1.
+                topVer(2, 0), // FullHistSize = 2.
+                topVer(2, 1), // FullHistSize = 3.
+                topVer(3, 0), // FullHistSize = 4.
+                topVer(3, 1)), // FullHistSize = 5.
                 5);
 
             startGrid(3);
 
             checkHistory(ignite, F.asList(
-                topVer(2, 1),
-                topVer(3, 0),
-                topVer(3, 1),
-                topVer(4, 0),
-                topVer(4, 1)),
+                topVer(2, 1), // FullHistSize = 3.
+                topVer(3, 0), // FullHistSize = 4.
+                topVer(3, 1), // FullHistSize = 5.
+                topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(4, 1)), // FullHistSize = 5.
                 5);
 
             client = true;
@@ -126,13 +126,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
             stopGrid(4);
 
             checkHistory(ignite, F.asList(
-                topVer(2, 1),
-                topVer(3, 0),
-                topVer(3, 1),
-                topVer(4, 0),
-                topVer(4, 1),
-                topVer(5, 0),
-                topVer(6, 0)),
+                topVer(3, 1), // FullHistSize = 5.
+                topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(4, 1), // FullHistSize = 5.
+                topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(6, 0)), // FullHistSize = 5.
                 5);
 
             startGrid(4);
@@ -140,15 +138,11 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
             stopGrid(4);
 
             checkHistory(ignite, F.asList(
-                topVer(2, 1),
-                topVer(3, 0),
-                topVer(3, 1),
-                topVer(4, 0),
-                topVer(4, 1),
-                topVer(5, 0),
-                topVer(6, 0),
-                topVer(7, 0),
-                topVer(8, 0)),
+                topVer(4, 1), // FullHistSize = 5.
+                topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(6, 0), // FullHistSize = 5.
+                topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(8, 0)), // FullHistSize = 5.
                 5);
 
             startGrid(4);
@@ -156,28 +150,24 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
             stopGrid(4);
 
             checkHistory(ignite, F.asList(
-                topVer(5, 0),
-                topVer(6, 0),
-                topVer(7, 0),
-                topVer(8, 0),
-                topVer(9, 0),
-                topVer(10, 0)),
-                0);
+                topVer(6, 0), // FullHistSize = 5.
+                topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(8, 0), // FullHistSize = 5.
+                topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(10, 0)), // FullHistSize = 5.
+                5);
 
             client = false;
 
             startGrid(4);
 
             checkHistory(ignite, F.asList(
-                topVer(5, 0),
-                topVer(6, 0),
-                topVer(7, 0),
-                topVer(8, 0),
-                topVer(9, 0),
-                topVer(10, 0),
-                topVer(11, 0),
-                topVer(11, 1)),
-                2);
+                topVer(8, 0), // FullHistSize = 5.
+                topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(10, 0), // FullHistSize = 5.
+                topVer(11, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+                topVer(11, 1)), // FullHistSize = 5.
+                5);
         }
         finally {
             if (histProp != null)
@@ -203,9 +193,9 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
         for (GridCacheContext cctx : proc.context().cacheContexts()) {
             GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff");
 
-            AtomicInteger histSize = GridTestUtils.getFieldValue(aff, "histSize");
+            AtomicInteger fullHistSize = GridTestUtils.getFieldValue(aff, "fullHistSize");
 
-            assertEquals(expSize, histSize.get());
+            assertEquals(expSize, fullHistSize.get());
 
             Map<AffinityTopologyVersion, Object> cache = GridTestUtils.getFieldValue(aff, "affCache");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
new file mode 100644
index 0000000..3b6857d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ * Tests for {@link GridAffinityProcessor}.
+ */
+@GridCommonTest(group = "Affinity Processor")
+public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest {
+    /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */
+    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setForceServerMode(true);
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(CACHE_NAME);
+
+        cacheCfg.setStoreKeepBinary(true);
+
+        cacheCfg.setCacheMode(CacheMode.LOCAL);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test affinity functions caching and clean up.
+     *
+     * @throws Exception In case of any exception.
+     */
+    public void testAffinityProcessor() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        IgniteKernal grid = (IgniteKernal)grid(0);
+
+        IgniteCache<String, String> cache;
+
+        IgniteCache<String, String> globalCache = getOrCreateGlobalCache(ignite);
+
+        IgniteDataStreamer<String, String> globalStreamer;
+
+        int count = MAX_HIST_SIZE * 4;
+
+        int size;
+
+        do {
+            try {
+                cache = createLocalCache(ignite, count);
+
+                cache.put("Key" + count, "Value" + count);
+
+                cache.destroy();
+
+                globalStreamer = createGlobalStreamer(ignite, globalCache);
+
+                globalStreamer.addData("GlobalKey" + count, "GlobalValue" + count);
+
+                globalStreamer.flush();
+
+                globalStreamer.close();
+
+                size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size();
+
+                assertTrue("Cache has size that bigger then expected [size=" + size + "" +
+                    ", expLimit=" + MAX_HIST_SIZE * 3 + "]", size < MAX_HIST_SIZE * 3);
+            }
+            catch (Exception e) {
+                fail("Error was handled [" + e.getMessage() + "]");
+            }
+        }
+        while (count-- > 0);
+    }
+
+    /**
+     * Creates global cache.
+     *
+     * @param ignite instance of {@code Ignite}.
+     * @param id unique id for local cache.
+     * @return local cache instance.
+     */
+    private static IgniteCache<String, String> createLocalCache(Ignite ignite, long id) {
+        final String cacheName = "localCache" + id;
+
+        final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
+
+        cCfg.setName(cacheName);
+
+        cCfg.setCacheMode(CacheMode.LOCAL);
+
+        cCfg.setGroupName("some group");
+
+        ignite.destroyCache(cacheName); // Local cache is not really local - reference can be kept by other nodes if restart during the load happens.
+
+        return ignite.createCache(cCfg).withKeepBinary();
+    }
+
+    /**
+     * Gets or creates global cache.
+     *
+     * @param ignite instance of {@code Ignite}.
+     * @return global cache instance.
+     */
+    private static IgniteCache<String, String> getOrCreateGlobalCache(Ignite ignite) {
+        final String cacheName = "GlobalCache";
+
+        final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
+
+        cCfg.setName(cacheName);
+
+        cCfg.setStoreKeepBinary(true);
+
+        cCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cCfg.setOnheapCacheEnabled(false);
+
+        cCfg.setCopyOnRead(false);
+
+        cCfg.setBackups(0);
+
+        cCfg.setWriteBehindEnabled(false);
+
+        cCfg.setReadThrough(false);
+
+        return ignite.getOrCreateCache(cCfg).withKeepBinary();
+    }
+
+    /**
+     * Creates streamer for global cache.
+     *
+     * @param ignite instance of {@code Ignite}.
+     * @param cache instance of global cache.
+     * @return instance of {@code IgniteDataStreamer}.
+     */
+    private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite,
+        IgniteCache<String, String> cache) {
+        IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName());
+
+        streamer.allowOverwrite(true);
+
+        streamer.skipStore(true);
+
+        streamer.keepBinary(false);
+
+        return streamer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/09002f2e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index e71a569..cc93c5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
 import org.apache.ignite.internal.MarshallerContextLockingSelfTest;
 import org.apache.ignite.internal.TransactionsMXBeanImplTest;
 import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
+import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorMemoryLeakTest;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
 import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest;
 import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
@@ -133,6 +134,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         GridTestUtils.addTestIfNeeded(suite, GridReleaseTypeSelfTest.class, ignoredTests);
         suite.addTestSuite(GridProductVersionSelfTest.class);
         suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class);
+        suite.addTestSuite(GridAffinityProcessorMemoryLeakTest.class);
         suite.addTestSuite(GridClosureProcessorSelfTest.class);
         suite.addTestSuite(GridClosureProcessorRemoteTest.class);
         suite.addTestSuite(GridClosureSerializationTest.class);