You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/03 18:27:19 UTC

[1/4] ignite git commit: 1093

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-3 d208fa8ef -> 5e01b411a


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82ac8e08
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82ac8e08
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82ac8e08

Branch: refs/heads/ignite-1093-3
Commit: 82ac8e0832c01663e31c295a6f02e4cbda840a84
Parents: d208fa8
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 3 17:30:27 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 3 17:30:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java        | 16 ++++++----------
 .../dht/preloader/GridDhtPartitionDemander.java     |  5 +++--
 2 files changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/82ac8e08/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 541ac2a..cfb0462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -270,7 +270,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             unmarshall(nodeId, cacheMsg);
 
             if (cacheMsg.classError() != null)
-                processFailedMessage(nodeId, cacheMsg);
+                processFailedMessage(nodeId, cacheMsg, c);
             else
                 processMessage(nodeId, cacheMsg, c);
         }
@@ -314,7 +314,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param msg Message.
      * @throws IgniteCheckedException If failed.
      */
-    private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
+    private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
+        throws IgniteCheckedException {
         GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
 
         switch (msg.directType()) {
@@ -413,9 +414,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             break;
 
             case 45: {
-                GridDhtPartitionSupplyMessage req = (GridDhtPartitionSupplyMessage)msg;
-
-                U.error(log, "Supply message cannot be unmarshalled.", req.classError());
+                processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
             }
 
             break;
@@ -519,9 +518,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             break;
 
             case 114: {
-                GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
-
-                U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+                processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
             }
 
             break;
@@ -537,8 +534,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param msg Message.
      * @param c Closure.
      */
-    private void processMessage(UUID nodeId, GridCacheMessage msg,
-        IgniteBiInClosure<UUID, GridCacheMessage> c) {
+    private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
         try {
             // We will not end up with storing a bunch of new UUIDs
             // in each cache entry, since node ID is stored in NIO session

http://git-wip-us.apache.org/repos/asf/ignite/blob/82ac8e08/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 8f23291..35cedf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -514,7 +514,8 @@ public class GridDhtPartitionDemander {
 
         // Check whether there were class loading errors on unmarshal
         if (supply.classError() != null) {
-            U.warn(log, "Class got undeployed during preloading: " + supply.classError());
+            U.warn(log, "Rebalancing from node cancelled [node=" + id +
+                "]. Class got undeployed during preloading: " + supply.classError());
 
             fut.cancel(id);
 
@@ -613,7 +614,7 @@ public class GridDhtPartitionDemander {
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
-                cctx.io().sendOrderedMessage(node,rebalanceTopics.get(idx),
+                cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
                     d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
             }
         }


[3/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cab5342
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cab5342
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cab5342

Branch: refs/heads/ignite-1093-3
Commit: 7cab53427809a3896a83a056614a1bc710944f22
Parents: c2a6787
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 3 18:41:15 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 3 18:41:15 2015 +0300

----------------------------------------------------------------------
 .../GridCacheRebalancingSyncSelfTest.java          | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7cab5342/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 6a7f701..4f96045 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
 import java.util.Map;
+import java.util.Random;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
@@ -222,12 +223,16 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         Thread t1 = new Thread() {
             @Override public void run() {
+                Random rdm = new Random();
+
                 while (!concurrentStartFinished) {
-                    for (int i = 0; i < 0 + TEST_SIZE; i++) {
+                    for (int i = 0; i < TEST_SIZE; i++) {
                         if (i % (TEST_SIZE / 10) == 0)
                             log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
 
-                        ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(i, i + CACHE_NAME_DHT_PARTITIONED.hashCode() + 0);
+                        int ii = rdm.nextInt(TEST_SIZE);
+
+                        ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode());
                     }
                 }
             }
@@ -250,8 +255,14 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         t2.start();
 
         startGrid(2);
+        startGrid(3);
+
+        stopGrid(2);
 
-        waitForRebalancing(2, 3);
+        startGrid(4);
+
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
 
         concurrentStartFinished = true;
 


[2/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2a67879
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2a67879
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2a67879

Branch: refs/heads/ignite-1093-3
Commit: c2a6787949ee6b023e8bda3ff3d84aa17be318e2
Parents: 82ac8e0
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 3 18:32:21 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 3 18:32:21 2015 +0300

----------------------------------------------------------------------
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |   2 +-
 ...eRebalancingUnmarshallingFailedSelfTest.java | 150 +++++++++++++++++++
 2 files changed, 151 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2a67879/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index 1b2b84d..f4423f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -87,7 +87,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
     }
 
     /** Test key 1. */
-    public static class TestKey implements Externalizable {
+    protected static class TestKey implements Externalizable {
         /** Field. */
         @QuerySqlField(index = true)
         private String field;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2a67879/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
new file mode 100644
index 0000000..649abda
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -0,0 +1,150 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** partitioned cache name. */
+    protected static String CACHE = "cache";
+
+    /** Allows to change behavior of readExternal method. */
+    protected static AtomicInteger readCnt = new AtomicInteger();
+
+    /** Test key 1. */
+    private static class TestKey implements Externalizable {
+        /** Field. */
+        @QuerySqlField(index = true)
+        private String field;
+
+        /**
+         * @param field Test key 1.
+         */
+        public TestKey(String field) {
+            this.field = field;
+        }
+
+        /** Test key 1. */
+        public TestKey() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey key = (TestKey)o;
+
+            return !(field != null ? !field.equals(key.field) : key.field != null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return field != null ? field.hashCode() : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(field);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            field = (String)in.readObject();
+
+            if (readCnt.decrementAndGet() <= 0)
+                throw new IOException("Class can not be unmarshalled.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<TestKey, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setName(CACHE);
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setBackups(1);
+        cfg.setRebalanceBatchSize(1);
+        cfg.setRebalanceBatchesPrefetchCount(1);
+        cfg.setRebalanceOrder(2);
+
+        iCfg.setCacheConfiguration(cfg);
+
+        return iCfg;
+    }
+
+    /**
+     * @throws Exception e.
+     */
+    public void test() throws Exception {
+        readCnt.set(Integer.MAX_VALUE);
+
+        startGrid(0);
+
+        for (int i = 0; i < 100; i++) {
+            grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
+        }
+
+        readCnt.set(1);
+
+        startGrid(1);
+
+        readCnt.set(Integer.MAX_VALUE);
+
+        for (int i = 0; i < 50; i++) {
+            assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
+        }
+
+        stopGrid(0);
+
+        for (int i = 50; i < 100; i++) {
+            assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) == null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}


[4/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e01b411
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e01b411
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e01b411

Branch: refs/heads/ignite-1093-3
Commit: 5e01b411a440a8254e2224c5e7fbc4df5aa00d43
Parents: 7cab534
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 3 20:27:04 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 3 20:27:04 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionsReservation.java       |   2 +-
 .../GridCacheRebalancingSyncSelfTest.java       |  26 +++++
 ...eRebalancingUnmarshallingFailedSelfTest.java |   5 +-
 .../junits/common/GridCommonAbstractTest.java   | 110 +++++++++++++++++++
 4 files changed, 138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e01b411/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index 756326e..d12247e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -239,7 +239,7 @@ public class GridDhtPartitionsReservation implements GridReservable {
     }
 
     /**
-     * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}.
+     * Must be checked in {@link GridDhtLocalPartition#tryEvict()}.
      * If returns {@code true} this reservation object becomes invalid and partitions
      * can be evicted or at least cleared.
      * Also this means that after returning {@code true} here method {@link #reserve()} can not

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e01b411/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 4f96045..2a64307 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -266,6 +266,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         concurrentStartFinished = true;
 
+        awaitPartitionMapExchange();
+
+        awaitPartitionEviction();
+
         t1.join();
         t2.join();
 
@@ -431,6 +435,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 5, 1);
         waitForRebalancing(4, 5, 1);
 
+        awaitPartitionMapExchange();
+
+        awaitPartitionEviction();
+
         checkSupplyContextMapIsEmpty();
 
         checkData(grid(4), 0, 1);
@@ -453,17 +461,35 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
+        awaitPartitionMapExchange();
+
+        awaitPartitionEviction();
+
+        checkSupplyContextMapIsEmpty();
+
         stopGrid(0);
 
         waitForRebalancing(2, 7);
         waitForRebalancing(3, 7);
         waitForRebalancing(4, 7);
 
+        awaitPartitionMapExchange();
+
+        awaitPartitionEviction();
+
+        checkSupplyContextMapIsEmpty();
+
         stopGrid(2);
 
         waitForRebalancing(3, 8);
         waitForRebalancing(4, 8);
 
+        awaitPartitionMapExchange();
+
+        awaitPartitionEviction();
+
+        checkSupplyContextMapIsEmpty();
+
         t4.join();
 
         stopGrid(3);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e01b411/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 649abda..831e82d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -102,10 +102,7 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
         cfg.setName(CACHE);
         cfg.setCacheMode(CacheMode.PARTITIONED);
         cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cfg.setBackups(1);
-        cfg.setRebalanceBatchSize(1);
-        cfg.setRebalanceBatchesPrefetchCount(1);
-        cfg.setRebalanceOrder(2);
+        cfg.setBackups(0);
 
         iCfg.setCacheConfiguration(cfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e01b411/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 8d6eb16..c3d0690 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -63,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
@@ -411,6 +413,114 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
     /**
      * @throws InterruptedException If interrupted.
+     * Should be run after awaitPartitionMapExchange.
+     */
+    @SuppressWarnings("BusyWait")
+    protected void awaitPartitionEviction() throws InterruptedException {
+        for (Ignite g : G.allGrids()) {
+            IgniteKernal g0 = (IgniteKernal)g;
+
+            for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
+                CacheConfiguration cfg = c.context().config();
+
+                if (cfg.getCacheMode() == PARTITIONED &&
+                    cfg.getRebalanceMode() != NONE &&
+                    g.cluster().nodes().size() > 1) {
+                    AffinityFunction aff = cfg.getAffinity();
+
+                    GridDhtCacheAdapter<?, ?> dht = dht(c);
+
+                    GridDhtPartitionTopology top = dht.topology();
+
+                    for (int p = 0; p < aff.partitions(); p++) {
+                        long start = 0;
+
+                        for (int i = 0; ; i++) {
+                            boolean match = false;
+
+                            AffinityTopologyVersion readyVer = dht.context().shared().exchange().readyAffinityVersion();
+
+                            if (readyVer.topologyVersion() > 0 && c.context().started()) {
+                                // Must map on updated version of topology.
+                                Collection<ClusterNode> affNodes =
+                                    g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(p);
+
+                                int exp = affNodes.size();
+
+                                GridDhtTopologyFuture topFut = top.topologyVersionFuture();
+
+                                Collection<ClusterNode> owners = (topFut != null && topFut.isDone()) ?
+                                    top.nodes(p, AffinityTopologyVersion.NONE) : Collections.<ClusterNode>emptyList();
+
+                                int actual = owners.size();
+
+                                GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
+
+                                if (loc != null && loc.state() == GridDhtPartitionState.RENTING) {
+                                    LT.warn(log(), null, "Waiting for evictions [" +
+                                        "grid=" + g.name() +
+                                        ", cache=" + cfg.getName() +
+                                        ", cacheId=" + dht.context().cacheId() +
+                                        ", topVer=" + top.topologyVersion() +
+                                        ", topFut=" + topFut +
+                                        ", p=" + p +
+                                        ", affNodesCnt=" + exp +
+                                        ", ownersCnt=" + actual +
+                                        ", affNodes=" + affNodes +
+                                        ", owners=" + owners +
+                                        ", locNode=" + g.cluster().localNode() + ']');
+                                }
+                                else
+                                    match = true;
+                            }
+                            else {
+                                LT.warn(log(), null, "Waiting for evictions [" +
+                                    "grid=" + g.name() +
+                                    ", cache=" + cfg.getName() +
+                                    ", cacheId=" + dht.context().cacheId() +
+                                    ", topVer=" + top.topologyVersion() +
+                                    ", started=" + dht.context().started() +
+                                    ", p=" + p +
+                                    ", readVer=" + readyVer +
+                                    ", locNode=" + g.cluster().localNode() + ']');
+                            }
+
+                            if (!match) {
+                                if (i == 0)
+                                    start = System.currentTimeMillis();
+
+                                if (System.currentTimeMillis() - start > 30_000) {
+                                    U.dumpThreads(log);
+
+                                    throw new IgniteException("Timeout of waiting for evictions [" +
+                                        "grid=" + g.name() +
+                                        ", cache=" + cfg.getName() +
+                                        ", cacheId=" + dht.context().cacheId() +
+                                        ", topVer=" + top.topologyVersion() +
+                                        ", p=" + p +
+                                        ", readVer=" + readyVer +
+                                        ", locNode=" + g.cluster().localNode() + ']');
+                                }
+
+                                Thread.sleep(200); // Busy wait.
+
+                                continue;
+                            }
+
+                            if (i > 0)
+                                log().warning("Finished waiting for topology map update [grid=" + g.name() +
+                                    ", p=" + p + ", duration=" + (System.currentTimeMillis() - start) + "ms]");
+
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
     protected void awaitPartitionMapExchange() throws InterruptedException {