You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/10 10:48:48 UTC

[50/50] [abbrv] ignite git commit: ignite-803: unmuted and changed tests for PARTITIONED cache

ignite-803: unmuted and changed tests for PARTITIONED cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5061f30d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5061f30d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5061f30d

Branch: refs/heads/ignite-801
Commit: 5061f30d986eff0188e1563e465a2cc8f054588c
Parents: af80486
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Nov 10 12:46:01 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Nov 10 12:46:01 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  12 --
 .../CacheDataStructuresManager.java             |   1 -
 .../GridFutureRemapTimeoutObject.java           |  72 ---------
 .../dht/GridPartitionedGetFuture.java           |  29 +---
 .../distributed/near/GridNearGetFuture.java     |  28 +---
 .../datastructures/GridCacheQueueAdapter.java   |   3 +-
 .../ignite/spi/discovery/DiscoverySpi.java      |   7 -
 ...eAbstractDataStructuresFailoverSelfTest.java | 150 +++++++++++++++++--
 ...rtitionedDataStructuresFailoverSelfTest.java |   7 +-
 ...edOffheapDataStructuresFailoverSelfTest.java |  10 +-
 10 files changed, 160 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 7b833a9..cd2f49c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -102,7 +102,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
-import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
@@ -1769,17 +1768,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Failure detection timeout used by discovery SPI. If the timeout is disabled then a value of the
-     * network timeout is returned.
-     *
-     * @return .
-     */
-    public long failureDetectionTimeout() {
-        return getSpi().failureDetectionTimeoutEnabled() ? ctx.config().getFailureDetectionTimeout() :
-            ctx.config().getNetworkTimeout();
-    }
-
-    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index ac90efc..930921b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.datastructures.GridTransactionalCac
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
deleted file mode 100644
index 72fdd4b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
-/**
- * Future remap timeout object.
- */
-public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter {
-    /** */
-    private final GridFutureAdapter<?> fut;
-
-    /** Finished flag. */
-    private final AtomicBoolean finished = new AtomicBoolean();
-
-    /** Topology version to wait. */
-    private final AffinityTopologyVersion topVer;
-
-    /** Exception cause. */
-    private final IgniteCheckedException e;
-
-    /**
-     * @param fut Future.
-     * @param timeout Timeout.
-     * @param topVer Topology version timeout was created on.
-     * @param e Exception cause.
-     */
-    public GridFutureRemapTimeoutObject(
-        GridFutureAdapter<?> fut,
-        long timeout,
-        AffinityTopologyVersion topVer,
-        IgniteCheckedException e) {
-        super(timeout);
-
-        this.fut = fut;
-        this.topVer = topVer;
-        this.e = e;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        if (finish()) // Fail the whole get future, else remap happened concurrently.
-            fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e));
-    }
-
-    /**
-     * @return Guard against concurrent completion.
-     */
-    public boolean finish() {
-        return finished.compareAndSet(false, true);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 89f1088..7654634 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -683,35 +682,23 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 final AffinityTopologyVersion updTopVer =
                     new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-
-                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                    cctx.discovery().failureDetectionTimeout(),
-                    updTopVer,
-                    e);
-
                 cctx.affinity().affinityReadyFuture(updTopVer).listen(
                     new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            if (timeout.finish()) {
-                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+                            try {
+                                fut.get();
 
-                                try {
-                                    fut.get();
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                                    // Remap.
-                                    map(keys.keySet(), F.t(node, keys), updTopVer);
-
-                                    onDone(Collections.<K, V>emptyMap());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    GridPartitionedGetFuture.this.onDone(e);
-                                }
+                                onDone(Collections.<K, V>emptyMap());
+                            }
+                            catch (IgniteCheckedException e) {
+                                GridPartitionedGetFuture.this.onDone(e);
                             }
                         }
                     }
                 );
-
-                cctx.kernalContext().timeout().addTimeoutObject(timeout);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9830288..e062695 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -884,34 +883,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                 final AffinityTopologyVersion updTopVer =
                     new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                    cctx.discovery().failureDetectionTimeout(),
-                    updTopVer,
-                    e);
-
                 cctx.affinity().affinityReadyFuture(updTopVer).listen(
                     new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            if (timeout.finish()) {
-                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
-                                try {
-                                    fut.get();
+                            try {
+                                fut.get();
 
-                                    // Remap.
-                                    map(keys.keySet(), F.t(node, keys), updTopVer);
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                                    onDone(Collections.<K, V>emptyMap());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    GridNearGetFuture.this.onDone(e);
-                                }
+                                onDone(Collections.<K, V>emptyMap());
+                            }
+                            catch (IgniteCheckedException e) {
+                                GridNearGetFuture.this.onDone(e);
                             }
                         }
                     }
                 );
-
-                cctx.kernalContext().timeout().addTimeoutObject(timeout);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 0843eac..eb23ad7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -821,8 +821,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
                 }
 
                 next++;
-            }
-            while (next != hdr.tail());
+            } while (next != hdr.tail());
 
             GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
                 hdr.capacity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index baa26d4..1ea5014 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -164,11 +164,4 @@ public interface DiscoverySpi extends IgniteSpi {
      * @throws IllegalStateException If discovery SPI has not started.
      */
     public boolean isClientMode() throws IllegalStateException;
-
-    /**
-     * Checks whether failure detection timeout is enabled for the discovery SPI.
-     *
-     * @return {@code true} if enabled, {@code false} otherwise.
-     */
-    public boolean failureDetectionTimeoutEnabled();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 6e91107..086a29d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -19,10 +19,13 @@ package org.apache.ignite.internal.processors.cache.datastructures;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
@@ -30,6 +33,8 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -38,6 +43,7 @@ import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
@@ -151,7 +157,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicLong(new ConstantMultipleTopologyChangeWorker());
+        doTestAtomicLong(multipleTopologyChangeWorker());
     }
 
     /**
@@ -213,7 +219,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicReference(new ConstantMultipleTopologyChangeWorker());
+        doTestAtomicReference(multipleTopologyChangeWorker());
     }
 
     /**
@@ -281,7 +287,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker());
+        doTestAtomicStamped(multipleTopologyChangeWorker());
     }
 
     /**
@@ -360,7 +366,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
-        doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker());
+        doTestCountDownLatch(multipleTopologyChangeWorker());
     }
 
     /**
@@ -432,7 +438,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testQueueConstantMultipleTopologyChange() throws Exception {
-        doTestQueue(new ConstantMultipleTopologyChangeWorker());
+        doTestQueue(multipleTopologyChangeWorker());
     }
 
     /**
@@ -573,7 +579,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicSequence(new ConstantMultipleTopologyChangeWorker());
+        doTestAtomicSequence(multipleTopologyChangeWorker());
     }
 
     /**
@@ -641,6 +647,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     }
 
     /**
+     * @return Specific multiple topology change worker implementation.
+     */
+    private ConstantTopologyChangeWorker multipleTopologyChangeWorker() {
+        return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() :
+            new MultipleTopologyChangeWorker();
+    }
+
+    /**
      *
      */
     private class ConstantTopologyChangeWorker {
@@ -653,6 +667,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
          * @return Future.
          */
         IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+            final boolean partitioned = collectionCacheMode() == CacheMode.PARTITIONED;
+
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
                 @Override public void apply() {
                     try {
@@ -668,8 +684,21 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                                 callback.apply(g);
                             }
                             finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
+                                if (i != TOP_CHANGE_CNT - 1) {
+                                    stopGrid(name, !partitioned);
+
+                                    if (partitioned) {
+                                        while (true) {
+                                            try {
+                                                awaitPartitionMapExchange();
+
+                                                break;
+                                            } catch(Exception ex){
+                                                U.error(log, ex.getMessage());
+                                            }
+                                        }
+                                    }
+                                }
                             }
                         }
                     }
@@ -687,7 +716,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      *
      */
-    private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+    private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
         /**
          * Starts changing cluster's topology.
          *
@@ -719,7 +748,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                             }
                             finally {
                                 if (i != TOP_CHANGE_CNT - 1) {
-
                                     for (String name : names)
                                         stopGrid(name);
                                 }
@@ -736,4 +764,104 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             return fut;
         }
     }
+
+    /**
+     *
+     */
+    private class PartitionedMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+        /** */
+        private CyclicBarrier barrier;
+
+        /**
+         * Starts changing cluster's topology.
+         *
+         * @return Future.
+         */
+        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+            final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT);
+
+            final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>();
+
+            barrier = new CyclicBarrier(TOP_CHANGE_THREAD_CNT, new Runnable() {
+                @Override public void run() {
+                    try {
+                        assertEquals(TOP_CHANGE_THREAD_CNT * 3, startedNodes.size());
+
+                        for (String name : startedNodes) {
+                            stopGrid(name, false);
+
+                            while (true) {
+                                try {
+                                    awaitPartitionMapExchange();
+
+                                    break;
+                                } catch(Exception ex){
+                                    U.error(log, ex.getMessage());
+                                }
+                            }
+                        }
+
+                        startedNodes.clear();
+
+                        sem.release(TOP_CHANGE_THREAD_CNT);
+
+                        barrier.reset();
+                    }
+                    catch (Exception e) {
+                        if (failed.compareAndSet(false, true)) {
+                            sem.release(TOP_CHANGE_THREAD_CNT);
+
+                            barrier.reset();
+
+                            throw F.wrap(e);
+                        }
+                    }
+                }
+            });
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+                @Override public void apply() {
+                    try {
+                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+                            sem.acquire();
+
+                            if (failed.get())
+                                return;
+
+                            for (int j = 0; j < 3; j++) {
+                                if (failed.get())
+                                    return;
+
+                                String name = UUID.randomUUID().toString();
+
+                                startedNodes.add(name);
+
+                                Ignite g = startGrid(name);
+
+                                callback.apply(g);
+                            }
+
+                            try {
+                                barrier.await();
+                            }
+                            catch (BrokenBarrierException e) {
+                                // Ignore.
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        if (failed.compareAndSet(false, true)) {
+                            sem.release(TOP_CHANGE_THREAD_CNT);
+
+                            barrier.reset();
+
+                            throw F.wrap(e);
+                        }
+                    }
+                }
+            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+            return fut;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
index 18b0b21..6c880a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 public class GridCachePartitionedDataStructuresFailoverSelfTest
     extends GridCacheAbstractDataStructuresFailoverSelfTest {
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-803");
-    }
-
-    /** {@inheritDoc} */
     @Override protected CacheMode collectionCacheMode() {
         return PARTITIONED;
     }
@@ -50,4 +45,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest
     @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
         return TRANSACTIONAL;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5061f30d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
index a9cd470..b3ded7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
@@ -24,14 +24,10 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
 /**
  * Failover tests for cache data structures.
  */
-public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends GridCachePartitionedDataStructuresFailoverSelfTest {
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-803");
-    }
-
+public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest
+    extends GridCachePartitionedDataStructuresFailoverSelfTest {
     /** {@inheritDoc} */
     @Override protected CacheMemoryMode collectionMemoryMode() {
         return OFFHEAP_TIERED;
     }
-}
\ No newline at end of file
+}