You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/03/29 13:33:07 UTC

[ignite] branch master updated: IGNITE-11465 Multiple client leave/join events may wipe affinity assignment history and cause transactions fail - Fixes #6217.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e2c198d  IGNITE-11465 Multiple client leave/join events may wipe affinity assignment history and cause transactions fail - Fixes #6217.
e2c198d is described below

commit e2c198d59842e048f04486affc29d64cc0e4299a
Author: Ivan Rakov <ir...@apache.org>
AuthorDate: Fri Mar 29 16:28:38 2019 +0300

    IGNITE-11465 Multiple client leave/join events may wipe affinity assignment history and cause transactions fail - Fixes #6217.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../jol/GridAffinityAssignmentJolBenchmark.java    |   5 +-
 .../affinity/GridAffinityAssignmentCache.java      | 108 +++++--
 .../affinity/HistoryAffinityAssignment.java        | 358 ++-------------------
 ...ent.java => HistoryAffinityAssignmentImpl.java} |  18 +-
 .../HistoryAffinityAssignmentShallowCopy.java      | 107 ++++++
 .../cache/affinity/AffinityHistoryCleanupTest.java |  58 +++-
 .../GridAffinityProcessorMemoryLeakTest.java       |  27 +-
 .../GridHistoryAffinityAssignmentTest.java         |   4 +-
 .../cache/CacheNoAffinityExchangeTest.java         |  85 +++++
 9 files changed, 381 insertions(+), 389 deletions(-)

diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
index b89cbe6..f154341 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.affinity.HistoryAffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.HistoryAffinityAssignmentImpl;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -252,7 +253,7 @@ public class GridAffinityAssignmentJolBenchmark {
 
             AffinityTopologyVersion topVer = new AffinityTopologyVersion(i + 1, 0);
             GridAffinityAssignmentV2 a = new GridAffinityAssignmentV2(topVer, lateAssignmemnt, idealAssignment);
-            HistoryAffinityAssignment h = new HistoryAffinityAssignment(a, backups);
+            HistoryAffinityAssignment h = new HistoryAffinityAssignmentImpl(a, backups);
 
             if (!lateAssignmemnt.equals(h.assignment()))
                 throw new RuntimeException();
@@ -273,7 +274,7 @@ public class GridAffinityAssignmentJolBenchmark {
             }
 
             GridAffinityAssignmentV2 a0 = new GridAffinityAssignmentV2(topVer0, assignment, idealAssignment);
-            HistoryAffinityAssignment h0 = new HistoryAffinityAssignment(a0, backups);
+            HistoryAffinityAssignment h0 = new HistoryAffinityAssignmentImpl(a0, backups);
 
             if (!assignment.equals(h0.assignment()))
                 throw new RuntimeException();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 0335552..0f468ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -66,7 +66,10 @@ import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVE
  */
 public class GridAffinityAssignmentCache {
     /** Cleanup history size. */
-    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
+    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 50);
+
+    /** Cleanup history links size (calculated by both real entries and shallow copies). */
+    private final int MAX_HIST_LINKS_SIZE = MAX_HIST_SIZE * 10;
 
     /** Partition distribution. */
     private final float partDistribution = getFloat(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 50f);
@@ -207,7 +210,9 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignmentV2 assignment = new GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment);
 
-        HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment, backups));
+        HistoryAffinityAssignmentImpl newHistEntry = new HistoryAffinityAssignmentImpl(assignment, backups);
+
+        HistoryAffinityAssignment existing = affCache.put(topVer, newHistEntry);
 
         head.set(assignment);
 
@@ -221,9 +226,7 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        // In case if value was replaced there is no sense to clean the history.
-        if (hAff == null)
-            onHistoryAdded();
+        onHistoryAdded(existing, newHistEntry);
 
         if (log.isTraceEnabled()) {
             log.trace("New affinity assignment [grp=" + cacheOrGrpName
@@ -491,7 +494,17 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignmentV2 assignmentCpy = new GridAffinityAssignmentV2(topVer, aff);
 
-        HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy, backups));
+        AffinityTopologyVersion prevVer = topVer.minorTopologyVersion() == 0 ?
+            new AffinityTopologyVersion(topVer.topologyVersion() - 1, Integer.MAX_VALUE) :
+            new AffinityTopologyVersion(topVer.topologyVersion(), topVer.minorTopologyVersion() - 1);
+
+        Map.Entry<AffinityTopologyVersion, HistoryAffinityAssignment> prevHistEntry = affCache.floorEntry(prevVer);
+
+        HistoryAffinityAssignment newHistEntry = (prevHistEntry == null) ?
+            new HistoryAffinityAssignmentImpl(assignmentCpy, backups) :
+            new HistoryAffinityAssignmentShallowCopy(prevHistEntry.getValue().origin(), topVer);
+
+        HistoryAffinityAssignment existing = affCache.put(topVer, newHistEntry);
 
         head.set(assignmentCpy);
 
@@ -505,9 +518,7 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        // In case if value was replaced there is no sense to clean the history.
-        if (hAff == null)
-            onHistoryAdded();
+        onHistoryAdded(existing, newHistEntry);
     }
 
     /**
@@ -680,10 +691,14 @@ public class GridAffinityAssignmentCache {
     /**
      * Get cached affinity for specified topology version.
      *
-     * @param topVer Topology version.
+     * @param topVer Topology version for which affinity assignment is requested.
+     * @param lastAffChangeTopVer Topology version of last affinity assignment change.
      * @return Cached affinity.
      */
-    public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer, AffinityTopologyVersion lastAffChangeTopVer) {
+    public AffinityAssignment cachedAffinity(
+        AffinityTopologyVersion topVer,
+        AffinityTopologyVersion lastAffChangeTopVer
+    ) {
         if (topVer.equals(AffinityTopologyVersion.NONE))
             topVer = lastAffChangeTopVer = lastVersion();
         else {
@@ -705,11 +720,23 @@ public class GridAffinityAssignmentCache {
             if (e != null)
                 cache = e.getValue();
 
-            if (cache == null || cache.topologyVersion().compareTo(topVer) > 0) {
+            if (cache == null) {
+                throw new IllegalStateException("Getting affinity for too old topology version that is already " +
+                    "out of history [locNode=" + ctx.discovery().localNode() +
+                    ", grp=" + cacheOrGrpName +
+                    ", topVer=" + topVer +
+                    ", lastAffChangeTopVer=" + lastAffChangeTopVer +
+                    ", head=" + head.get().topologyVersion() +
+                    ", history=" + affCache.keySet() +
+                    ']');
+            }
+
+            if (cache.topologyVersion().compareTo(topVer) > 0) {
                 throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " +
                     "calculated [locNode=" + ctx.discovery().localNode() +
                     ", grp=" + cacheOrGrpName +
                     ", topVer=" + topVer +
+                    ", lastAffChangeTopVer=" + lastAffChangeTopVer +
                     ", head=" + head.get().topologyVersion() +
                     ", history=" + affCache.keySet() +
                     ']');
@@ -810,23 +837,62 @@ public class GridAffinityAssignmentCache {
 
     /**
      * Cleaning the affinity history.
+     *
+     * @param replaced Replaced entry in case history item was already present, null otherwise.
+     * @param added New history item.
      */
-    private void onHistoryAdded() {
-        if (fullHistSize.incrementAndGet() > MAX_HIST_SIZE) {
-            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
+    private void onHistoryAdded(
+        HistoryAffinityAssignment replaced,
+        HistoryAffinityAssignment added
+    ) {
+        boolean cleanupNeeded = false;
 
-            int rmvCnt = MAX_HIST_SIZE / 2;
+        if (replaced == null) {
+            cleanupNeeded = true;
+
+            if (added.requiresHistoryCleanup())
+                fullHistSize.incrementAndGet();
+        }
+        else {
+            if (replaced.requiresHistoryCleanup() != added.requiresHistoryCleanup()) {
+                if (added.requiresHistoryCleanup()) {
+                    cleanupNeeded = true;
+
+                    fullHistSize.incrementAndGet();
+                }
+                else
+                    fullHistSize.decrementAndGet();
+            }
+        }
+
+        if (!cleanupNeeded)
+            return;
+
+        int fullSize = fullHistSize.get();
+
+        int linksSize = affCache.size();
+
+        int fullRmvCnt = fullSize > MAX_HIST_SIZE ? (MAX_HIST_SIZE / 2) : 0;
+
+        int linksRmvCnt = linksSize > MAX_HIST_LINKS_SIZE ? (MAX_HIST_LINKS_SIZE / 2) : 0;
+
+        if (fullRmvCnt > 0 || linksRmvCnt > 0) {
+            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
 
             AffinityTopologyVersion topVerRmv = null;
 
-            while (it.hasNext() && rmvCnt > 0) {
-                AffinityAssignment aff0 = it.next();
+            while (it.hasNext() && (fullRmvCnt > 0 || linksRmvCnt > 0)) {
+                HistoryAffinityAssignment aff0 = it.next();
 
-                it.remove();
+                if (aff0.requiresHistoryCleanup()) { // Don't decrement counter in case of fullHistoryCleanupRequired copy remove.
+                    fullRmvCnt--;
+
+                    fullHistSize.decrementAndGet();
+                }
 
-                rmvCnt--;
+                linksRmvCnt--;
 
-                fullHistSize.decrementAndGet();
+                it.remove();
 
                 topVerRmv = aff0.topologyVersion();
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index c6c0783..19bd88c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -1,343 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.ignite.internal.processors.affinity;
 
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.BitSetIntSet;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
 /**
- *
+ * Interface for historical calculated affinity assignment.
  */
-@SuppressWarnings("ForLoopReplaceableByForEach")
-public class HistoryAffinityAssignment implements AffinityAssignment {
-    /** */
-    private final AffinityTopologyVersion topVer;
-
-    /** */
-    private final List<List<ClusterNode>> assignment;
-
-    /** */
-    private final List<List<ClusterNode>> idealAssignment;
-
-    /** */
-    private final ClusterNode[] nodes;
-
-    /** Ideal assignments are stored as sequences of indexes in nodes array. */
-    private final char[] idealParts;
-
-    /** Diff with ideal. */
-    private final Map<Integer, char[]> assignmentDiff;
-
+public interface HistoryAffinityAssignment extends AffinityAssignment {
     /**
-     * @param assign Assignment.
-     * @param backups Backups.
+     * Should return true if instance is "heavy" and should be taken into account during history size management.
+     *
+     * @return <code>true</code> if adding this instance to history should trigger size check and possible cleanup.
      */
-    public HistoryAffinityAssignment(AffinityAssignment assign, int backups) {
-        topVer = assign.topologyVersion();
-
-        if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > IGNITE_AFFINITY_BACKUPS_THRESHOLD) {
-            assignment = assign.assignment();
-
-            idealAssignment = assign.idealAssignment();
-
-            nodes = null;
-
-            idealParts = null;
-
-            assignmentDiff = null;
-
-            return;
-        }
-
-        List<List<ClusterNode>> assignment = assign.assignment();
-        List<List<ClusterNode>> idealAssignment = assign.idealAssignment();
-
-        int min = Integer.MAX_VALUE;
-        int max = 0;
-
-        for (List<ClusterNode> nodes : idealAssignment) { // Estimate required size.
-            int size = nodes.size();
-
-            if (size > max)
-                max = size;
-
-            if (size < min)
-                min = size;
-        }
-
-        if (max != min) {
-            this.assignment = assign.assignment();
-
-            this.idealAssignment = assign.idealAssignment();
-
-            nodes = null;
-
-            idealParts = null;
-
-            assignmentDiff = null;
-
-            return;
-        }
-
-        int cpys = max;
-
-        boolean same = assignment == idealAssignment;
-
-        int partsCnt = assignment.size();
-
-        idealParts = new char[partsCnt * cpys];
-
-        Map<ClusterNode, Character> orderMap = new HashMap<>();
-
-        char order = 1; // Char type is used as unsigned short to avoid conversions.
-
-        assignmentDiff = new HashMap<>();
-
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> curr = assignment.get(p);
-            List<ClusterNode> ideal = idealAssignment.get(p);
-
-            for (int i = 0; i < ideal.size(); i++) {
-                ClusterNode node = ideal.get(i);
-
-                Character nodeOrder = orderMap.get(node);
-
-                if (nodeOrder == null)
-                    orderMap.put(node, (nodeOrder = order++));
-
-                idealParts[p * cpys + i] = nodeOrder;
-            }
-
-            if (!same && !curr.equals(ideal)) {
-                char[] idx = new char[curr.size()];
-
-                assignmentDiff.put(p, idx);
-
-                for (int i = 0; i < curr.size(); i++) {
-                    ClusterNode node = curr.get(i);
-
-                    Character nodeOrder = orderMap.get(node);
-
-                    if (nodeOrder == null)
-                        orderMap.put(node, (nodeOrder = order++));
-
-                    idx[i] = nodeOrder;
-                }
-            }
-        }
-
-        // Fill array according to assigned order.
-        nodes = orderMap.keySet().stream().toArray(ClusterNode[]::new);
-
-        Arrays.sort(nodes, (o1, o2) -> orderMap.get(o1).compareTo(orderMap.get(o2)));
-
-        this.idealAssignment = new AbstractList<List<ClusterNode>>() {
-            @Override public List<ClusterNode> get(int idx) {
-                return partitionNodes(idx, true, cpys);
-            }
-
-            @Override public int size() {
-                return partsCnt;
-            }
-        };
-
-        this.assignment = same ? this.idealAssignment : new AbstractList<List<ClusterNode>>() {
-            @Override public List<ClusterNode> get(int idx) {
-                return partitionNodes(idx, false, cpys);
-            }
-
-            @Override public int size() {
-                return partsCnt;
-            }
-        };
-
-        assert this.assignment.equals(assign.assignment()) : "new=" + this.assignment + ", old=" + assign.assignment();
-
-        assert this.idealAssignment.equals(assign.idealAssignment()) :
-            "new=" + this.idealAssignment + ", old=" + assign.idealAssignment();
-    }
+    public boolean requiresHistoryCleanup();
 
     /**
-     * @param p Partion.
-     * @param ideal {@code True} for ideal assignment.
-     * @param cpys Copies.
+     * In case this instance is lightweight wrapper of another instance, this method should return reference
+     * to an original one. Otherwise, it should return <code>this</code> reference.
+     *
+     * @return Original instance of <code>this</code> if not applicable.
      */
-    private List<ClusterNode> partitionNodes(int p, boolean ideal, int cpys) {
-        char[] order;
-
-        if (!ideal && (order = assignmentDiff.get(p)) != null) {
-            List<ClusterNode> ret = new ArrayList<>(order.length);
-
-            for (int i = 0; i < order.length; i++)
-                ret.add(nodes[order[i] - 1]);
-
-            return ret;
-        }
-
-        List<ClusterNode> ret = new ArrayList<>(cpys);
-
-        for (int i = 0; i < cpys; i++) {
-            char ord = idealParts[p * cpys + i];
-
-            if (ord == 0) // Zero
-                break;
-
-            ret.add(nodes[ord - 1]);
-        }
-
-        return ret;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public List<List<ClusterNode>> idealAssignment() {
-        return idealAssignment;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<List<ClusterNode>> assignment() {
-        return assignment;
-    }
-
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> get(int part) {
-        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
-
-        return assignment.get(part);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> getIds(int part) {
-        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
-
-        if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION)
-            return assignments2ids(assignment.get(part));
-        else {
-            List<ClusterNode> nodes = assignment.get(part);
-
-            return nodes.size() > AffinityAssignment.IGNITE_AFFINITY_BACKUPS_THRESHOLD
-                    ? assignments2ids(nodes)
-                    : F.viewReadOnly(nodes, F.node2id());
-        }
-     }
-
-    /** {@inheritDoc} */
-    @Override public Set<ClusterNode> nodes() {
-        Set<ClusterNode> res = new HashSet<>();
-
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> nodes = assignment.get(p);
-
-            if (!F.isEmpty(nodes))
-                res.addAll(nodes);
-        }
-
-        return Collections.unmodifiableSet(res);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<ClusterNode> primaryPartitionNodes() {
-        Set<ClusterNode> res = new HashSet<>();
-
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> nodes = assignment.get(p);
-
-            if (!F.isEmpty(nodes))
-                res.add(nodes.get(0));
-        }
-
-        return Collections.unmodifiableSet(res);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
-        Set<Integer> res = IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION ? new HashSet<>() : new BitSetIntSet();
-
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> nodes = assignment.get(p);
-
-            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
-                res.add(p);
-        }
-
-        return Collections.unmodifiableSet(res);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<Integer> backupPartitions(UUID nodeId) {
-        Set<Integer> res = IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION ? new HashSet<>() : new BitSetIntSet();
-
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> nodes = assignment.get(p);
-
-            for (int i = 1; i < nodes.size(); i++) {
-                ClusterNode node = nodes.get(i);
-
-                if (node.id().equals(nodeId)) {
-                    res.add(p);
-
-                    break;
-                }
-            }
-        }
-
-        return Collections.unmodifiableSet(res);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return topVer.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("SimplifiableIfStatement")
-    @Override public boolean equals(Object o) {
-        if (o == this)
-            return true;
-
-        if (o == null || !(o instanceof AffinityAssignment))
-            return false;
-
-        return topVer.equals(((AffinityAssignment)o).topologyVersion());
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HistoryAffinityAssignment.class, this);
-    }
+    public HistoryAffinityAssignment origin();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
similarity index 95%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
index c6c0783..1eda706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
@@ -34,10 +34,10 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- *
+ * Heap-space optimized version of calculated affinity assignment.
  */
 @SuppressWarnings("ForLoopReplaceableByForEach")
-public class HistoryAffinityAssignment implements AffinityAssignment {
+public class HistoryAffinityAssignmentImpl implements HistoryAffinityAssignment {
     /** */
     private final AffinityTopologyVersion topVer;
 
@@ -60,7 +60,7 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
      * @param assign Assignment.
      * @param backups Backups.
      */
-    public HistoryAffinityAssignment(AffinityAssignment assign, int backups) {
+    public HistoryAffinityAssignmentImpl(AffinityAssignment assign, int backups) {
         topVer = assign.topologyVersion();
 
         if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > IGNITE_AFFINITY_BACKUPS_THRESHOLD) {
@@ -320,6 +320,16 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean requiresHistoryCleanup() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HistoryAffinityAssignment origin() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return topVer.hashCode();
     }
@@ -338,6 +348,6 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(HistoryAffinityAssignment.class, this);
+        return S.toString(HistoryAffinityAssignmentImpl.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
new file mode 100644
index 0000000..4fcea72
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
@@ -0,0 +1,107 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Shallow copy that contains reference to delegate {@link HistoryAffinityAssignment}.
+ */
+public class HistoryAffinityAssignmentShallowCopy implements HistoryAffinityAssignment {
+    /** History assignment. */
+    private final HistoryAffinityAssignment histAssignment;
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * @param histAssignment History assignment.
+     * @param topVer Topology version.
+     */
+    public HistoryAffinityAssignmentShallowCopy(
+        HistoryAffinityAssignment histAssignment,
+        AffinityTopologyVersion topVer
+    ) {
+        this.histAssignment = histAssignment;
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean requiresHistoryCleanup() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> idealAssignment() {
+        return histAssignment.idealAssignment();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignment() {
+        return histAssignment.assignment();
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
+        return histAssignment.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> getIds(int part) {
+        return histAssignment.getIds(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> nodes() {
+        return histAssignment.nodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        return histAssignment.primaryPartitionNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        return histAssignment.primaryPartitions(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        return histAssignment.backupPartitions(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HistoryAffinityAssignment origin() {
+        return histAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HistoryAffinityAssignmentShallowCopy.class, this);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
index 6de9228..12096f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -119,11 +118,13 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
         stopGrid(4);
 
         checkHistory(ignite, F.asList(
+            topVer(2, 1), // FullHistSize = 3.
+            topVer(3, 0), // FullHistSize = 4.
             topVer(3, 1), // FullHistSize = 5.
             topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
             topVer(4, 1), // FullHistSize = 5.
-            topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(6, 0)), // FullHistSize = 5.
+            topVer(5, 0), // Client event ->FullHistSize = 5.
+            topVer(6, 0)), // Client event ->FullHistSize = 5.
             5);
 
         startGrid(4);
@@ -131,11 +132,15 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
         stopGrid(4);
 
         checkHistory(ignite, F.asList(
+            topVer(2, 1), // FullHistSize = 3.
+            topVer(3, 0), // FullHistSize =4.
+            topVer(3, 1), // FullHistSize = 5.
+            topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
             topVer(4, 1), // FullHistSize = 5.
-            topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(6, 0), // FullHistSize = 5.
-            topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(8, 0)), // FullHistSize = 5.
+            topVer(5, 0), // Client event -> FullHistSize = 5.
+            topVer(6, 0), // Client event -> FullHistSize = 5.
+            topVer(7, 0), // Client event -> FullHistSize = 5.
+            topVer(8, 0)), // Client event ->FullHistSize = 5.
             5);
 
         startGrid(4);
@@ -143,11 +148,17 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
         stopGrid(4);
 
         checkHistory(ignite, F.asList(
-            topVer(6, 0), // FullHistSize = 5.
-            topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(8, 0), // FullHistSize = 5.
-            topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(10, 0)), // FullHistSize = 5.
+            topVer(2, 1), // FullHistSize = 3.
+            topVer(3, 0), // FullHistSize = 4.
+            topVer(3, 1), // FullHistSize =5.
+            topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+            topVer(4, 1), // FullHistSize = 5.
+            topVer(5, 0), // Client event -> FullHistSize = 5.
+            topVer(6, 0), // Client event -> FullHistSize = 5.
+            topVer(7, 0), // Client event -> FullHistSize = 5.
+            topVer(8, 0), // Client event -> FullHistSize = 5.
+            topVer(9, 0), // Client event -> FullHistSize = 5.
+            topVer(10, 0)), // Client event ->FullHistSize = 5.
             5);
 
         client = false;
@@ -155,12 +166,29 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
         startGrid(4);
 
         checkHistory(ignite, F.asList(
-            topVer(8, 0), // FullHistSize = 5.
-            topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
-            topVer(10, 0), // FullHistSize = 5.
+            topVer(3, 1), // FullHistSize = 5.
+            topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+            topVer(4, 1), // FullHistSize = 5.
+            topVer(5, 0), // Client event -> FullHistSize = 5.
+            topVer(6, 0), // Client event -> FullHistSize = 5.
+            topVer(7, 0), // Client event -> FullHistSize = 5.
+            topVer(8, 0), // Client event -> FullHistSize = 5.
+            topVer(9, 0), // Client event -> FullHistSize = 5.
+            topVer(10, 0), // Client event -> FullHistSize = 5.
             topVer(11, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
             topVer(11, 1)), // FullHistSize = 5.
             5);
+
+        stopGrid(4);
+
+        startGrid(4);
+
+        checkHistory(ignite, F.asList(
+            topVer(11, 1), // FullHistSize = 5.
+            topVer(12, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+            topVer(13, 0), // FullHistSize = 5.
+            topVer(13, 1)), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+            4);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
index b841a87..1bb309f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
@@ -21,27 +21,23 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
-
 /**
  * Tests for {@link GridAffinityProcessor}.
  */
 @GridCommonTest(group = "Affinity Processor")
 public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest {
-    /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */
-    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
-
     /** Cache name. */
     private static final String CACHE_NAME = "cache";
 
@@ -75,7 +71,10 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
      * @throws Exception In case of any exception.
      */
     @Test
+    @WithSystemProperty(key = IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE, value = "10")
     public void testAffinityProcessor() throws Exception {
+        int maxHistSize = 10;
+
         Ignite ignite = startGrid(0);
 
         IgniteKernal grid = (IgniteKernal)grid(0);
@@ -86,21 +85,23 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
 
         IgniteDataStreamer<String, String> globalStreamer;
 
-        int count = MAX_HIST_SIZE * 4;
+        int cnt = maxHistSize * 30;
+
+        int expLimit = cnt / 2;
 
         int size;
 
         do {
             try {
-                cache = createLocalCache(ignite, count);
+                cache = createLocalCache(ignite, cnt);
 
-                cache.put("Key" + count, "Value" + count);
+                cache.put("Key" + cnt, "Value" + cnt);
 
                 cache.destroy();
 
                 globalStreamer = createGlobalStreamer(ignite, globalCache);
 
-                globalStreamer.addData("GlobalKey" + count, "GlobalValue" + count);
+                globalStreamer.addData("GlobalKey" + cnt, "GlobalValue" + cnt);
 
                 globalStreamer.flush();
 
@@ -108,14 +109,14 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
 
                 size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size();
 
-                assertTrue("Cache has size that bigger then expected [size=" + size + "" +
-                    ", expLimit=" + MAX_HIST_SIZE * 3 + "]", size < MAX_HIST_SIZE * 3);
+                assertTrue("Cache has size that bigger then expected [size=" + size +
+                    ", expLimit=" + expLimit + "]", size < expLimit);
             }
             catch (Exception e) {
                 fail("Error was handled [" + e.getMessage() + "]");
             }
         }
-        while (count-- > 0);
+        while (cnt-- > 0);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
index 9fa209a..cf6eb68 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
@@ -62,13 +62,13 @@ public class GridHistoryAffinityAssignmentTest extends GridCommonAbstractTest {
 
         AffinityTopologyVersion topVer = new AffinityTopologyVersion(1, 0);
         HistoryAffinityAssignment lateAssign =
-            new HistoryAffinityAssignment(new GridAffinityAssignmentV2(topVer, curr, ideal), 1);
+            new HistoryAffinityAssignmentImpl(new GridAffinityAssignmentV2(topVer, curr, ideal), 1);
 
         assertEquals("Late", curr, lateAssign.assignment());
         assertEquals("Ideal late", ideal, lateAssign.idealAssignment());
 
         HistoryAffinityAssignment idealAssign = new
-            HistoryAffinityAssignment(new GridAffinityAssignmentV2(topVer, ideal, ideal), 1);
+            HistoryAffinityAssignmentImpl(new GridAffinityAssignmentV2(topVer, ideal, ideal), 1);
 
         assertSame(idealAssign.assignment(), idealAssign.idealAssignment());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
index 66013b1..61d6c06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
@@ -19,17 +19,23 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
@@ -45,7 +51,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessag
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 import org.junit.Test;
 
 /**
@@ -77,6 +85,11 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(new TestDiscoverySpi().setIpFinder(IP_FINDER));
 
+        cfg.setActiveOnStart(false);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+            new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024)));
+
         if (startClient) {
             cfg.setClientMode(true);
 
@@ -91,6 +104,8 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
+        startClient = false;
+
         super.afterTest();
     }
 
@@ -222,6 +237,8 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
         try {
             Ignite ig = startGridsMultiThreaded(4);
 
+            ig.cluster().active(true);
+
             IgniteCache<Integer, Integer> atomicCache = ig.createCache(new CacheConfiguration<Integer, Integer>()
                 .setName("atomic").setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.REPLICATED));
 
@@ -287,6 +304,74 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Tests that multiple client events won't fail transactions due to affinity assignment history expiration.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE, value = "10")
+    public void testMulipleClientLeaveJoin() throws Exception {
+        Ignite ig = startGrids(2);
+
+        ig.cluster().active(true);
+
+        startClient = true;
+
+        IgniteEx stableClient = startGrid(2);
+
+        IgniteCache<Integer, Integer> stableClientTxCacheProxy = stableClient.createCache(
+            new CacheConfiguration<Integer, Integer>()
+                .setName("tx")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+        awaitPartitionMapExchange();
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                for (int i = 0; i < 10; i++) {
+                    try {
+                        startGrid(3);
+
+                        stopGrid(3);
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        });
+
+        CountDownLatch clientTxLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture loadFut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try (Transaction tx = stableClient.transactions().txStart()) {
+                    ThreadLocalRandom r = ThreadLocalRandom.current();
+
+                    stableClientTxCacheProxy.put(r.nextInt(100), r.nextInt());
+
+                    try {
+                        clientTxLatch.await();
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteInterruptedException(e);
+                    }
+
+                    tx.commit();
+                }
+            }
+        });
+
+        fut.get();
+
+        clientTxLatch.countDown();
+
+        loadFut.get();
+    }
+
+    /**
      *
      */
     public static class TestDiscoverySpi extends TcpDiscoverySpi {