You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/02/08 09:09:52 UTC

[ignite] branch master updated: IGNITE-11193: MVCC TX: the query with specified explicit partitions fails. This closes #6045.

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c3cc426  IGNITE-11193: MVCC TX: the query with specified explicit partitions fails. This closes #6045.
c3cc426 is described below

commit c3cc426e081d59d723215932946594359dabdbe4
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Fri Feb 8 12:09:40 2019 +0300

    IGNITE-11193: MVCC TX: the query with specified explicit partitions fails. This closes #6045.
---
 .../near/GridNearTxAbstractEnlistFuture.java       |  37 +---
 .../distributed/near/GridNearTxEnlistFuture.java   |  24 +-
 .../near/GridNearTxQueryEnlistFuture.java          | 243 +++++++++++----------
 .../near/GridNearTxQueryResultsEnlistFuture.java   |  24 +-
 .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java    |  96 ++++++++
 5 files changed, 238 insertions(+), 186 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index b31314d..e43b5e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -61,10 +60,6 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
     private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD =
         AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done");
 
-    /** Done field updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxAbstractEnlistFuture, Throwable> EX_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, Throwable.class, "ex");
-
     /** Cache context. */
     @GridToStringExclude
     protected final GridCacheContext<?, ?> cctx;
@@ -101,11 +96,6 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
     /** */
     @SuppressWarnings("unused")
     @GridToStringExclude
-    protected volatile Throwable ex;
-
-    /** */
-    @SuppressWarnings("unused")
-    @GridToStringExclude
     private volatile int done;
 
     /** Timeout object. */
@@ -316,7 +306,7 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
     /**
      */
     private void mapOnTopology() {
-        cctx.topology().readLock();
+        cctx.topology().readLock(); boolean topLocked = true;
 
         try {
             if (cctx.topology().stopping()) {
@@ -330,6 +320,8 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
 
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
+            cctx.topology().readUnlock(); topLocked = false;
+
             if (fut.isDone()) {
                 Throwable err = fut.validateCache(cctx, false, false, null, null);
 
@@ -363,39 +355,18 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
             }
         }
         finally {
-            if (cctx.topology().holdsLock())
+            if (topLocked)
                 cctx.topology().readUnlock();
         }
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) {
-        if (ex != null || !EX_UPD.compareAndSet(this, null, err))
-            ex.addSuppressed(err);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) {
         if (!DONE_UPD.compareAndSet(this, 0, 1))
             return false;
 
-        // Need to unlock topology to avoid deadlock with binary descriptors registration.
-        if (cctx.topology().holdsLock())
-            cctx.topology().readUnlock();
-
         cctx.tm().txContext(tx);
 
-        Throwable ex0 = ex;
-
-        if (ex0 != null) {
-            if (err != null)
-                ex0.addSuppressed(err);
-
-            err = ex0;
-        }
-
         if (!cancelled && err == null)
             tx.clearLockFuture(this);
         else
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 396c4b5..05d9eca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -171,10 +171,6 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
 
             boolean first = (nodeId != null);
 
-            // Need to unlock topology to avoid deadlock with binary descriptors registration.
-            if (!topLocked && cctx.topology().holdsLock())
-                cctx.topology().readUnlock();
-
             for (Batch batch : next) {
                 ClusterNode node = batch.node();
 
@@ -567,13 +563,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
 
             topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
 
-            processFailure(topEx, null);
-
-            batches.remove(nodeId);
-
-            if (batches.isEmpty()) // Wait for all pending requests.
-                onDone();
-
+            onDone(topEx);
         }
 
         if (log.isDebugEnabled())
@@ -598,14 +588,8 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
         if (res != null)
             tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
 
-        if (err != null)
-            processFailure(err, null);
-
-        if (ex != null) {
-            batches.remove(nodeId);
-
-            if (batches.isEmpty()) // Wait for all pending requests.
-                onDone();
+        if (err != null) {
+            onDone(err);
 
             return false;
         }
@@ -625,7 +609,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
 
         tx.hasRemoteLocks(true);
 
-        return true;
+        return !isDone();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index deeb8b7..414aee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -17,8 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,141 +99,138 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
      * @param topLocked Topology locked flag.
      */
     @Override protected void map(final boolean topLocked) {
-        MiniFuture mini = null;
-
         try {
-            final AffinityAssignment assignment = cctx.affinity().assignment(topVer);
+            Map<ClusterNode, IntArrayHolder> map; boolean locallyMapped = false;
 
-            Collection<ClusterNode> primary;
+            AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
             if (parts != null) {
-                primary = U.newHashSet(parts.length);
+                map = U.newHashMap(parts.length);
 
                 for (int i = 0; i < parts.length; i++) {
                     ClusterNode pNode = assignment.get(parts[i]).get(0);
 
-                    primary.add(pNode);
+                    map.computeIfAbsent(pNode, n -> new IntArrayHolder()).add(parts[i]);
 
                     updateMappings(pNode);
+
+                    if (!locallyMapped && pNode.isLocal())
+                        locallyMapped = true;
                 }
             }
             else {
-                primary = assignment.primaryPartitionNodes();
+                Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
+
+                map = U.newHashMap(nodes.size());
+
+                for (ClusterNode pNode : nodes) {
+                    map.put(pNode, null);
 
-                for (ClusterNode pNode : primary)
                     updateMappings(pNode);
+
+                    if (!locallyMapped && pNode.isLocal())
+                        locallyMapped = true;
+                }
             }
 
-            if (primary.isEmpty())
+            if (map.isEmpty())
                 throw new ClusterTopologyServerNotFoundException("Failed to find data nodes for cache (all partition " +
                     "nodes left the grid).");
 
-            boolean locallyMapped = primary.contains(cctx.localNode());
-
-            if (locallyMapped)
-                add(new MiniFuture(cctx.localNode()));
+            int idx = 0; boolean first = true, clientFirst = false;
 
-            int idx = locallyMapped ? 1 : 0;
-            boolean first = true;
-            boolean clientFirst = false;
+            GridDhtTxQueryEnlistFuture localFut = null;
 
-            // Need to unlock topology to avoid deadlock with binary descriptors registration.
-            if (!topLocked && cctx.topology().holdsLock())
-                cctx.topology().readUnlock();
+            for (Map.Entry<ClusterNode, IntArrayHolder> entry : map.entrySet()) {
+                MiniFuture mini; ClusterNode node = entry.getKey(); IntArrayHolder parts = entry.getValue();
 
-            for (ClusterNode node : F.view(primary, F.remoteNodes(cctx.localNodeId()))) {
                 add(mini = new MiniFuture(node));
 
-                if (first) {
-                    clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
-
-                    first = false;
+                if (node.isLocal()) {
+                    localFut = new GridDhtTxQueryEnlistFuture(
+                        cctx.localNode().id(),
+                        lockVer,
+                        mvccSnapshot,
+                        threadId,
+                        futId,
+                        -(++idx), // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
+                        tx,
+                        cacheIds,
+                        parts == null ? null : parts.array(),
+                        schema,
+                        qry,
+                        params,
+                        flags,
+                        pageSize,
+                        remainingTime(),
+                        cctx);
+
+                    updateLocalFuture(localFut);
+
+                    localFut.listen(new CI1<IgniteInternalFuture<Long>>() {
+                        @Override public void apply(IgniteInternalFuture<Long> fut) {
+                            assert fut.error() != null || fut.result() != null : fut;
+
+                            try {
+                                clearLocalFuture((GridDhtTxQueryEnlistFuture)fut);
+
+                                GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
+
+                                mini.onResult(res, fut.error());
+                            }
+                            catch (IgniteCheckedException e) {
+                                mini.onResult(null, e);
+                            }
+                            finally {
+                                CU.unwindEvicts(cctx);
+                            }
+                        }
+                    });
                 }
+                else {
+                    if (first) {
+                        clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
 
-                GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
-                    cctx.cacheId(),
-                    threadId,
-                    futId,
-                    ++idx,
-                    tx.subjectId(),
-                    topVer,
-                    lockVer,
-                    mvccSnapshot,
-                    cacheIds,
-                    parts,
-                    schema,
-                    qry,
-                    params,
-                    flags,
-                    pageSize,
-                    remainingTime(),
-                    tx.remainingTime(),
-                    tx.taskNameHash(),
-                    clientFirst
-                );
-
-                sendRequest(req, node.id(), mini);
-            }
-
-            if (locallyMapped) {
-                final MiniFuture localMini = mini = miniFuture(-1);
-
-                assert localMini != null;
-
-                GridDhtTxQueryEnlistFuture fut = new GridDhtTxQueryEnlistFuture(
-                    cctx.localNode().id(),
-                    lockVer,
-                    mvccSnapshot,
-                    threadId,
-                    futId,
-                    -1,
-                    tx,
-                    cacheIds,
-                    parts,
-                    schema,
-                    qry,
-                    params,
-                    flags,
-                    pageSize,
-                    remainingTime(),
-                    cctx);
-
-                updateLocalFuture(fut);
-
-                fut.listen(new CI1<IgniteInternalFuture<Long>>() {
-                    @Override public void apply(IgniteInternalFuture<Long> fut) {
-                        assert fut.error() != null || fut.result() != null : fut;
-
-                        try {
-                            clearLocalFuture((GridDhtTxQueryEnlistFuture)fut);
-
-                            GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
-
-                            localMini.onResult(res, fut.error());
-                        }
-                        catch (IgniteCheckedException e) {
-                            localMini.onResult(null, e);
-                        }
-                        finally {
-                            CU.unwindEvicts(cctx);
-                        }
+                        first = false;
                     }
-                });
 
-                fut.init();
+                    GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
+                        cctx.cacheId(),
+                        threadId,
+                        futId,
+                        ++idx, // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
+                        tx.subjectId(),
+                        topVer,
+                        lockVer,
+                        mvccSnapshot,
+                        cacheIds,
+                        parts == null ? null : parts.array(),
+                        schema,
+                        qry,
+                        params,
+                        flags,
+                        pageSize,
+                        remainingTime(),
+                        tx.remainingTime(),
+                        tx.taskNameHash(),
+                        clientFirst
+                    );
+
+                    sendRequest(req, node.id(), mini);
+                }
             }
+
+            markInitialized();
+
+            if (localFut != null)
+                localFut.init();
         }
         catch (Throwable e) {
-            if (mini != null)
-                mini.onResult(null, e);
-            else
-                onDone(e);
+            onDone(e);
 
             if (e instanceof Error)
                 throw (Error)e;
         }
-
-        markInitialized();
     }
 
     /**
@@ -291,19 +288,10 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
      * @param miniId Mini ID to find.
      * @return Mini future.
      */
-    private MiniFuture miniFuture(int miniId) {
-         synchronized (this) {
-            int idx = Math.abs(miniId) - 1;
+    private synchronized MiniFuture miniFuture(int miniId) {
+        IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
 
-            assert idx >= 0 && idx < futuresCountNoLock();
-
-            IgniteInternalFuture<Long> fut = future(idx);
-
-            if (!fut.isDone())
-                return (MiniFuture)fut;
-        }
-
-        return null;
+        return !fut.isDone() ? (MiniFuture)fut : null;
     }
 
     /**
@@ -404,4 +392,33 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
             return err != null ? onDone(err) : onDone(res.result(), res.error());
         }
     }
+
+    /** */
+    private static class IntArrayHolder {
+        /** */
+        private int[] array;
+        /** */
+        private int size;
+
+        /** */
+        void add(int i) {
+            if (array == null)
+                array = new int[4];
+
+            if (array.length == size)
+                array = Arrays.copyOf(array, size << 1);
+
+            array[size++] = i;
+        }
+
+        /** */
+        public int[] array() {
+            if (array == null)
+                return null;
+            else if (size == array.length)
+                return array;
+            else
+                return Arrays.copyOf(array, size);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index e3bbed4..dc8afb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -152,10 +152,6 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
 
             boolean first = (nodeId != null);
 
-            // Need to unlock topology to avoid deadlock with binary descriptors registration.
-            if (!topLocked && cctx.topology().holdsLock())
-                cctx.topology().readUnlock();
-
             for (Batch batch : next) {
                 ClusterNode node = batch.node();
 
@@ -534,13 +530,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
 
             topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
 
-            processFailure(topEx, null);
-
-            batches.remove(nodeId);
-
-            if (batches.isEmpty()) // Wait for all pending requests.
-                onDone();
-
+            onDone(topEx);
         }
 
         if (log.isDebugEnabled())
@@ -565,14 +555,8 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
         if (res != null)
             tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
 
-        if (err != null)
-            processFailure(err, null);
-
-        if (ex != null) {
-            batches.remove(nodeId);
-
-            if (batches.isEmpty()) // Wait for all pending requests.
-                onDone();
+        if (err != null) {
+            onDone(err);
 
             return false;
         }
@@ -583,7 +567,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
 
         tx.hasRemoteLocks(true);
 
-        return true;
+        return !isDone();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index 27cb8f2..d4b4620 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -40,6 +41,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -1756,6 +1758,100 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
      * @throws Exception If failed.
      */
     @Test
+    public void testUpdateExplicitPartitionsWithoutReducer() throws Exception {
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 10)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        Ignite ignite = startGridsMultiThreaded(4);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        Affinity<Object> affinity = internalCache0(cache).affinity();
+
+        int keysCnt = 10, retryCnt = 0;
+
+        Integer test = 0;
+
+        Map<Integer, Integer> vals = new LinkedHashMap<>();
+
+        while (vals.size() < keysCnt) {
+            int partition = affinity.partition(test);
+
+            if (partition == 1 || partition == 2)
+                vals.put(test, 0);
+            else
+                assertTrue("Maximum retry number exceeded", ++retryCnt < 1000);
+
+            test++;
+        }
+
+        cache.putAll(vals);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer set _val=2").setPartitions(1,2);
+
+        List<List<?>> all = cache.query(qry).getAll();
+
+        assertEquals(Long.valueOf(keysCnt), all.stream().findFirst().orElseThrow(AssertionError::new).get(0));
+
+        List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _val FROM Integer")).getAll();
+
+        assertEquals(keysCnt, rows.size());
+        assertTrue(rows.stream().map(r -> r.get(0)).map(Integer.class::cast).allMatch(v -> v == 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateExplicitPartitionsWithReducer() throws Exception {
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 10)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        Ignite ignite = startGridsMultiThreaded(4);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        Affinity<Object> affinity = internalCache0(cache).affinity();
+
+        int keysCnt = 10, retryCnt = 0;
+
+        Integer test = 0;
+
+        Map<Integer, Integer> vals = new LinkedHashMap<>();
+
+        while (vals.size() < keysCnt) {
+            int partition = affinity.partition(test);
+
+            if (partition == 1 || partition == 2)
+                vals.put(test, 0);
+            else
+                assertTrue("Maximum retry number exceeded", ++retryCnt < 1000);
+
+            test++;
+        }
+
+        cache.putAll(vals);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer set _val=(SELECT 2 FROM DUAL)").setPartitions(1,2);
+
+        List<List<?>> all = cache.query(qry).getAll();
+
+        assertEquals(Long.valueOf(keysCnt), all.stream().findFirst().orElseThrow(AssertionError::new).get(0));
+
+        List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _val FROM Integer")).getAll();
+
+        assertEquals(keysCnt, rows.size());
+        assertTrue(rows.stream().map(r -> r.get(0)).map(Integer.class::cast).allMatch(v -> v == 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
     public void testFastInsertUpdateConcurrent() throws Exception {
         ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
             .setIndexedTypes(Integer.class, Integer.class);