You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/16 07:29:16 UTC

ignite git commit: IGNITE-1239 - Fixed scan query failover on changing topology.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1239 [created] 471b75e70


IGNITE-1239 - Fixed scan query failover on changing topology.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/471b75e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/471b75e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/471b75e7

Branch: refs/heads/ignite-1239
Commit: 471b75e70398c7db7e621e7afa91d2c62fbc258d
Parents: d928ef4
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 15 22:29:11 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 15 22:29:11 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  45 ++++---
 .../query/GridCacheDistributedQueryFuture.java  |  25 +++-
 .../cache/query/GridCacheLocalQueryFuture.java  |   5 +
 .../cache/query/GridCacheQueryAdapter.java      | 116 +++++++++++++++----
 .../query/GridCacheQueryFutureAdapter.java      |  11 +-
 .../cache/query/GridCacheQueryManager.java      |  11 +-
 ...CacheScanPartitionQueryFallbackSelfTest.java |  42 ++++---
 7 files changed, 183 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..4c24a51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -745,16 +745,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 int part = ctx.affinity().partition(cacheKey);
 
-                boolean nearKey;
+                boolean dhtKey = false;
 
-                if (!(modes.near && modes.primary && modes.backup)) {
+                if (modes.primary || modes.backup) {
                     boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
 
                     if (keyPrimary) {
                         if (!modes.primary)
                             return null;
 
-                        nearKey = false;
+                        dhtKey = true;
                     }
                     else {
                         boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer);
@@ -763,36 +763,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                             if (!modes.backup)
                                 return null;
 
-                            nearKey = false;
-                        }
-                        else {
-                            if (!modes.near)
-                                return null;
-
-                            nearKey = true;
-
-                            // Swap and offheap are disabled for near cache.
-                            modes.offheap = false;
-                            modes.swap = false;
+                            dhtKey = true;
                         }
                     }
-                }
-                else {
-                    nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
 
-                    if (nearKey) {
-                        // Swap and offheap are disabled for near cache.
-                        modes.offheap = false;
-                        modes.swap = false;
-                    }
+                    // We will always peek DHT entry if both primary and backup flags are set, regardless of
+                    // affinity calculation.
+                    // This is required because there are scenarios when neither primary nor backup node is an owner,
+                    // but we need to be able to peek cache value.
+                    dhtKey |= (modes.primary && modes.backup);
                 }
 
-                if (nearKey && !ctx.isNear())
+                if (!dhtKey && !ctx.isNear())
                     return null;
 
                 if (modes.heap) {
-                    GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
-                        (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
+                    GridCacheEntryEx e = (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
 
                     if (e != null) {
                         cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc);
@@ -800,6 +786,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         modes.offheap = false;
                         modes.swap = false;
                     }
+
+                    if (cacheVal == null && modes.near && ctx.isNear()) {
+                        e = peekEx(cacheKey);
+
+                        cacheVal = e.peek(modes.heap, false, false, topVer, plc);
+
+                        modes.offheap = false;
+                        modes.swap = false;
+                    }
                 }
 
                 if (modes.offheap || modes.swap) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 1d547c5..06d0625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
@@ -144,7 +145,24 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
         if (callOnPage)
             // We consider node departure as a reception of last empty
             // page from this node.
-            onPage(nodeId, Collections.emptyList(), null, true);
+            onPage(nodeId, Collections.emptyList(),
+                new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitFirstPage() throws IgniteCheckedException {
+        try {
+            firstPageLatch.await();
+
+            if (isDone() && error() != null)
+                // Throw the exception if future failed.
+                get();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -229,9 +247,12 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
 
     /** {@inheritDoc} */
     @Override public boolean onDone(Collection<R> res, Throwable err) {
+        boolean done = super.onDone(res, err);
+
+        // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
         firstPageLatch.countDown();
 
-        return super.onDone(res, err);
+        return done;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 46af18a..248dfa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -77,6 +77,11 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public void awaitFirstPage() throws IgniteCheckedException {
+        get();
+    }
+
     /** */
     private class LocalQueryRunnable implements GridPlainRunnable {
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 3ac5746..9b1fb7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -34,6 +35,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
@@ -41,6 +43,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -63,13 +66,6 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
  * Query adapter.
  */
 public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
-    /** Is local node predicate. */
-    private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
-        @Override public boolean apply(ClusterNode n) {
-            return n.isLocal();
-        }
-    };
-
     /** */
     private final GridCacheContext<?, ?> cctx;
 
@@ -446,7 +442,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        if (nodes.isEmpty())
+        if (nodes.isEmpty() && (type != SCAN || part == null))
             return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
 
         if (log.isDebugEnabled())
@@ -477,8 +473,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         if (type == SQL_FIELDS || type == SPI)
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
-        else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx);
+        else if (type == SCAN && part != null)
+            return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
@@ -581,37 +577,47 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         /** Partition. */
         private final int part;
 
+        /** Flag indicating that a first item has been returned to a user. */
+        private boolean firstItemReturned;
+
         /**
-         * @param nodes Backups.
          * @param part Partition.
          * @param bean Bean.
          * @param qryMgr Query manager.
          * @param cctx Cache context.
          */
-        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean,
+        private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean,
             GridCacheQueryManager qryMgr, GridCacheContext cctx) {
-            this.nodes = fallbacks(nodes);
             this.bean = bean;
             this.qryMgr = qryMgr;
             this.cctx = cctx;
             this.part = part;
 
+            nodes = fallbacks(cctx.discovery().topologyVersionEx());
+
             init();
         }
 
         /**
-         * @param nodes Nodes.
          * @return Nodes for query execution.
          */
-        private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
-            Queue<ClusterNode> fallbacks = new LinkedList<>();
+        private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
+            Deque<ClusterNode> fallbacks = new LinkedList<>();
+            Collection<ClusterNode> owners = new HashSet<>();
 
-            ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+            for (ClusterNode node : cctx.topology().owners(part, topVer)) {
+                if (node.isLocal())
+                    fallbacks.addFirst(node);
+                else
+                    fallbacks.add(node);
 
-            if (node != null)
-                fallbacks.add(node);
+                owners.add(node);
+            }
 
-            fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
+            for (ClusterNode node : cctx.topology().moving(part)) {
+                if (!owners.contains(node))
+                    fallbacks.add(node);
+            }
 
             return fallbacks;
         }
@@ -651,7 +657,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                                         @Override public void apply(
                                             IgniteInternalFuture<AffinityTopologyVersion> future) {
 
-                                            nodes = fallbacks(cctx.topology().owners(part, topVer));
+                                            nodes = fallbacks(topVer);
 
                                             // Race is impossible here because query retries are executed one by one.
                                             unreservedTopVer = null;
@@ -684,7 +690,73 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         /** {@inheritDoc} */
         @Override public R next() {
-            return fut.next();
+            if (firstItemReturned)
+                return fut.next();
+
+            while (true) {
+                try {
+                    fut.awaitFirstPage();
+
+                    firstItemReturned = true;
+
+                    return fut.next();
+                }
+                catch (IgniteClientDisconnectedCheckedException e) {
+                    throw U.convertException(e);
+                }
+                catch (IgniteCheckedException e) {
+                    retryIfPossible(e);
+                }
+            }
+        }
+
+        /**
+         * @param e Exception for query run.
+         */
+        private void retryIfPossible(IgniteCheckedException e) {
+            try {
+                IgniteInternalFuture<?> retryFut;
+
+                if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
+                    AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+
+                    assert waitVer != null;
+
+                    retryFut = cctx.affinity().affinityReadyFuture(waitVer);
+                }
+                else if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                    ClusterTopologyCheckedException topEx = X.cause(e, ClusterTopologyCheckedException.class);
+
+                    retryFut = topEx.retryReadyFuture();
+                }
+                else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
+                    ClusterGroupEmptyCheckedException ex = X.cause(e, ClusterGroupEmptyCheckedException.class);
+
+                    retryFut = ex.retryReadyFuture();
+                }
+                else
+                    throw U.convertException(e);
+
+                if (F.isEmpty(nodes)) {
+                    if (--unreservedNodesRetryCnt > 0) {
+                        if (retryFut != null)
+                            retryFut.get();
+
+                        nodes = fallbacks(unreservedTopVer == null ? cctx.discovery().topologyVersionEx() : unreservedTopVer);
+
+                        unreservedTopVer = null;
+
+                        init();
+                    }
+                    else
+                        throw U.convertException(e);
+                }
+                else
+                    init();
+            }
+            catch (IgniteCheckedException ex) {
+                throw U.convertException(ex);
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index ad9ee39..2a4fbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -183,6 +183,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /**
+     * Waits for the first page to be received from remote node(s), if any.
+     *
+     * @throws IgniteCheckedException If query execution failed with an error.
+     */
+    public abstract void awaitFirstPage() throws IgniteCheckedException;
+
+    /**
      * Returns next page for the query.
      *
      * @return Next page or {@code null} if no more pages available.
@@ -380,13 +387,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                 synchronized (mux) {
                     enqueue(Collections.emptyList());
 
-                    onPage(nodeId, true);
-
                     onDone(nodeId != null ?
                         new IgniteCheckedException("Failed to execute query on node [query=" + qry +
                             ", nodeId=" + nodeId + "]", err) :
                         new IgniteCheckedException("Failed to execute query locally: " + qry, err));
 
+                    onPage(nodeId, true);
+
                     mux.notifyAll();
                 }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 1d934d8..a7b55eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1557,7 +1557,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
             }
             catch (Throwable e) {
-                U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
+                if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                    U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
 
                 onPageReady(loc, qryInfo, null, true, e);
 
@@ -1572,8 +1573,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
                         }
                         catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
-                                cctx.nodeId() + "]", e);
+                            if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                                U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+                                    cctx.nodeId() + "]", e);
                         }
                     }
                 }
@@ -1694,7 +1696,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     fut.get().closeIfNotShared(recipient(sndId, reqId));
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to close iterator.", e);
+                    if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                        U.error(log, "Failed to close iterator.", e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 1ef470a..c087ffc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
@@ -44,14 +45,12 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -103,6 +102,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     /** Test entries. */
     private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
 
+    /** */
+    private boolean syncRebalance;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -120,6 +122,10 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
         ccfg.setBackups(backups);
+
+        if (syncRebalance)
+            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
         ccfg.setNearConfiguration(null);
 
         cfg.setCacheConfiguration(ccfg);
@@ -192,8 +198,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     public void testScanFallbackOnRebalancing() throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
-        backups = 1;
+        backups = 2;
         commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+        syncRebalance = true;
 
         try {
             Ignite ignite = startGrids(GRID_CNT);
@@ -214,6 +221,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                             Thread.sleep(3000);
 
+                            info("Will stop grid: " + getTestGridName(id));
+
                             stopGrid(id);
 
                             if (done.get())
@@ -224,7 +233,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                         return null;
                     }
-                }, GRID_CNT);
+                }, 2);
 
             final AtomicInteger nodeIdx = new AtomicInteger();
 
@@ -233,18 +242,18 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
                     @Override public Object call() throws Exception {
                         int nodeId = nodeIdx.getAndIncrement();
 
-                        IgniteCacheProxy<Integer, Integer> cache = (IgniteCacheProxy<Integer, Integer>)
-                            grid(nodeId).<Integer, Integer>cache(null);
+                        IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
 
                         while (!done.get()) {
-                            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+                            int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            int part = tup.get1();
+                            info("Running query [node=" + nodeId + ", part=" + part + ']');
 
-                            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(
-                                null, part, false);
+                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+                                     cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
 
-                            doTestScanQuery(qry, part);
+                                doTestScanQueryCursor(cur, part);
+                            }
                         }
 
                         return null;
@@ -270,8 +279,6 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      * @throws Exception In case of error.
      */
     public void testScanFallbackOnRebalancingCursor() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1239");
-
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
         backups = 1;
@@ -311,12 +318,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
                         while (!done.get()) {
                             int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            QueryCursor<Cache.Entry<Integer, Integer>> cur =
-                                cache.query(new ScanQuery<Integer, Integer>(part));
+                            info("Running query [node=" + nodeId + ", part=" + part + ']');
 
-                            U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']');
+                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+                                cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
 
-                            doTestScanQueryCursor(cur, part);
+                                doTestScanQueryCursor(cur, part);
+                            }
                         }
 
                         return null;