You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/02/08 09:09:52 UTC
[ignite] branch master updated: IGNITE-11193: MVCC TX: the query
with specified explicit partitions fails. This closes #6045.
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c3cc426 IGNITE-11193: MVCC TX: the query with specified explicit partitions fails. This closes #6045.
c3cc426 is described below
commit c3cc426e081d59d723215932946594359dabdbe4
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Fri Feb 8 12:09:40 2019 +0300
IGNITE-11193: MVCC TX: the query with specified explicit partitions fails. This closes #6045.
---
.../near/GridNearTxAbstractEnlistFuture.java | 37 +---
.../distributed/near/GridNearTxEnlistFuture.java | 24 +-
.../near/GridNearTxQueryEnlistFuture.java | 243 +++++++++++----------
.../near/GridNearTxQueryResultsEnlistFuture.java | 24 +-
.../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 96 ++++++++
5 files changed, 238 insertions(+), 186 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index b31314d..e43b5e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCacheRestartingException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -61,10 +60,6 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done");
- /** Done field updater. */
- private static final AtomicReferenceFieldUpdater<GridNearTxAbstractEnlistFuture, Throwable> EX_UPD =
- AtomicReferenceFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, Throwable.class, "ex");
-
/** Cache context. */
@GridToStringExclude
protected final GridCacheContext<?, ?> cctx;
@@ -101,11 +96,6 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
/** */
@SuppressWarnings("unused")
@GridToStringExclude
- protected volatile Throwable ex;
-
- /** */
- @SuppressWarnings("unused")
- @GridToStringExclude
private volatile int done;
/** Timeout object. */
@@ -316,7 +306,7 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
/**
*/
private void mapOnTopology() {
- cctx.topology().readLock();
+ cctx.topology().readLock(); boolean topLocked = true;
try {
if (cctx.topology().stopping()) {
@@ -330,6 +320,8 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
+ cctx.topology().readUnlock(); topLocked = false;
+
if (fut.isDone()) {
Throwable err = fut.validateCache(cctx, false, false, null, null);
@@ -363,39 +355,18 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
}
}
finally {
- if (cctx.topology().holdsLock())
+ if (topLocked)
cctx.topology().readUnlock();
}
}
/** {@inheritDoc} */
- @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) {
- if (ex != null || !EX_UPD.compareAndSet(this, null, err))
- ex.addSuppressed(err);
-
- return true;
- }
-
- /** {@inheritDoc} */
@Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) {
if (!DONE_UPD.compareAndSet(this, 0, 1))
return false;
- // Need to unlock topology to avoid deadlock with binary descriptors registration.
- if (cctx.topology().holdsLock())
- cctx.topology().readUnlock();
-
cctx.tm().txContext(tx);
- Throwable ex0 = ex;
-
- if (ex0 != null) {
- if (err != null)
- ex0.addSuppressed(err);
-
- err = ex0;
- }
-
if (!cancelled && err == null)
tx.clearLockFuture(this);
else
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 396c4b5..05d9eca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -171,10 +171,6 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
boolean first = (nodeId != null);
- // Need to unlock topology to avoid deadlock with binary descriptors registration.
- if (!topLocked && cctx.topology().holdsLock())
- cctx.topology().readUnlock();
-
for (Batch batch : next) {
ClusterNode node = batch.node();
@@ -567,13 +563,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
- processFailure(topEx, null);
-
- batches.remove(nodeId);
-
- if (batches.isEmpty()) // Wait for all pending requests.
- onDone();
-
+ onDone(topEx);
}
if (log.isDebugEnabled())
@@ -598,14 +588,8 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
if (res != null)
tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
- if (err != null)
- processFailure(err, null);
-
- if (ex != null) {
- batches.remove(nodeId);
-
- if (batches.isEmpty()) // Wait for all pending requests.
- onDone();
+ if (err != null) {
+ onDone(err);
return false;
}
@@ -625,7 +609,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
tx.hasRemoteLocks(true);
- return true;
+ return !isDone();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index deeb8b7..414aee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,141 +99,138 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
* @param topLocked Topology locked flag.
*/
@Override protected void map(final boolean topLocked) {
- MiniFuture mini = null;
-
try {
- final AffinityAssignment assignment = cctx.affinity().assignment(topVer);
+ Map<ClusterNode, IntArrayHolder> map; boolean locallyMapped = false;
- Collection<ClusterNode> primary;
+ AffinityAssignment assignment = cctx.affinity().assignment(topVer);
if (parts != null) {
- primary = U.newHashSet(parts.length);
+ map = U.newHashMap(parts.length);
for (int i = 0; i < parts.length; i++) {
ClusterNode pNode = assignment.get(parts[i]).get(0);
- primary.add(pNode);
+ map.computeIfAbsent(pNode, n -> new IntArrayHolder()).add(parts[i]);
updateMappings(pNode);
+
+ if (!locallyMapped && pNode.isLocal())
+ locallyMapped = true;
}
}
else {
- primary = assignment.primaryPartitionNodes();
+ Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
+
+ map = U.newHashMap(nodes.size());
+
+ for (ClusterNode pNode : nodes) {
+ map.put(pNode, null);
- for (ClusterNode pNode : primary)
updateMappings(pNode);
+
+ if (!locallyMapped && pNode.isLocal())
+ locallyMapped = true;
+ }
}
- if (primary.isEmpty())
+ if (map.isEmpty())
throw new ClusterTopologyServerNotFoundException("Failed to find data nodes for cache (all partition " +
"nodes left the grid).");
- boolean locallyMapped = primary.contains(cctx.localNode());
-
- if (locallyMapped)
- add(new MiniFuture(cctx.localNode()));
+ int idx = 0; boolean first = true, clientFirst = false;
- int idx = locallyMapped ? 1 : 0;
- boolean first = true;
- boolean clientFirst = false;
+ GridDhtTxQueryEnlistFuture localFut = null;
- // Need to unlock topology to avoid deadlock with binary descriptors registration.
- if (!topLocked && cctx.topology().holdsLock())
- cctx.topology().readUnlock();
+ for (Map.Entry<ClusterNode, IntArrayHolder> entry : map.entrySet()) {
+ MiniFuture mini; ClusterNode node = entry.getKey(); IntArrayHolder parts = entry.getValue();
- for (ClusterNode node : F.view(primary, F.remoteNodes(cctx.localNodeId()))) {
add(mini = new MiniFuture(node));
- if (first) {
- clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
-
- first = false;
+ if (node.isLocal()) {
+ localFut = new GridDhtTxQueryEnlistFuture(
+ cctx.localNode().id(),
+ lockVer,
+ mvccSnapshot,
+ threadId,
+ futId,
+ -(++idx), // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
+ tx,
+ cacheIds,
+ parts == null ? null : parts.array(),
+ schema,
+ qry,
+ params,
+ flags,
+ pageSize,
+ remainingTime(),
+ cctx);
+
+ updateLocalFuture(localFut);
+
+ localFut.listen(new CI1<IgniteInternalFuture<Long>>() {
+ @Override public void apply(IgniteInternalFuture<Long> fut) {
+ assert fut.error() != null || fut.result() != null : fut;
+
+ try {
+ clearLocalFuture((GridDhtTxQueryEnlistFuture)fut);
+
+ GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
+
+ mini.onResult(res, fut.error());
+ }
+ catch (IgniteCheckedException e) {
+ mini.onResult(null, e);
+ }
+ finally {
+ CU.unwindEvicts(cctx);
+ }
+ }
+ });
}
+ else {
+ if (first) {
+ clientFirst = cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
- GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
- cctx.cacheId(),
- threadId,
- futId,
- ++idx,
- tx.subjectId(),
- topVer,
- lockVer,
- mvccSnapshot,
- cacheIds,
- parts,
- schema,
- qry,
- params,
- flags,
- pageSize,
- remainingTime(),
- tx.remainingTime(),
- tx.taskNameHash(),
- clientFirst
- );
-
- sendRequest(req, node.id(), mini);
- }
-
- if (locallyMapped) {
- final MiniFuture localMini = mini = miniFuture(-1);
-
- assert localMini != null;
-
- GridDhtTxQueryEnlistFuture fut = new GridDhtTxQueryEnlistFuture(
- cctx.localNode().id(),
- lockVer,
- mvccSnapshot,
- threadId,
- futId,
- -1,
- tx,
- cacheIds,
- parts,
- schema,
- qry,
- params,
- flags,
- pageSize,
- remainingTime(),
- cctx);
-
- updateLocalFuture(fut);
-
- fut.listen(new CI1<IgniteInternalFuture<Long>>() {
- @Override public void apply(IgniteInternalFuture<Long> fut) {
- assert fut.error() != null || fut.result() != null : fut;
-
- try {
- clearLocalFuture((GridDhtTxQueryEnlistFuture)fut);
-
- GridNearTxQueryEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
-
- localMini.onResult(res, fut.error());
- }
- catch (IgniteCheckedException e) {
- localMini.onResult(null, e);
- }
- finally {
- CU.unwindEvicts(cctx);
- }
+ first = false;
}
- });
- fut.init();
+ GridNearTxQueryEnlistRequest req = new GridNearTxQueryEnlistRequest(
+ cctx.cacheId(),
+ threadId,
+ futId,
+ ++idx, // The common tx logic expects non-zero mini-future ids (negative local and positive non-local).
+ tx.subjectId(),
+ topVer,
+ lockVer,
+ mvccSnapshot,
+ cacheIds,
+ parts == null ? null : parts.array(),
+ schema,
+ qry,
+ params,
+ flags,
+ pageSize,
+ remainingTime(),
+ tx.remainingTime(),
+ tx.taskNameHash(),
+ clientFirst
+ );
+
+ sendRequest(req, node.id(), mini);
+ }
}
+
+ markInitialized();
+
+ if (localFut != null)
+ localFut.init();
}
catch (Throwable e) {
- if (mini != null)
- mini.onResult(null, e);
- else
- onDone(e);
+ onDone(e);
if (e instanceof Error)
throw (Error)e;
}
-
- markInitialized();
}
/**
@@ -291,19 +288,10 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
* @param miniId Mini ID to find.
* @return Mini future.
*/
- private MiniFuture miniFuture(int miniId) {
- synchronized (this) {
- int idx = Math.abs(miniId) - 1;
+ private synchronized MiniFuture miniFuture(int miniId) {
+ IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
- assert idx >= 0 && idx < futuresCountNoLock();
-
- IgniteInternalFuture<Long> fut = future(idx);
-
- if (!fut.isDone())
- return (MiniFuture)fut;
- }
-
- return null;
+ return !fut.isDone() ? (MiniFuture)fut : null;
}
/**
@@ -404,4 +392,33 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
return err != null ? onDone(err) : onDone(res.result(), res.error());
}
}
+
+ /** */
+ private static class IntArrayHolder {
+ /** */
+ private int[] array;
+ /** */
+ private int size;
+
+ /** */
+ void add(int i) {
+ if (array == null)
+ array = new int[4];
+
+ if (array.length == size)
+ array = Arrays.copyOf(array, size << 1);
+
+ array[size++] = i;
+ }
+
+ /** */
+ public int[] array() {
+ if (array == null)
+ return null;
+ else if (size == array.length)
+ return array;
+ else
+ return Arrays.copyOf(array, size);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index e3bbed4..dc8afb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -152,10 +152,6 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
boolean first = (nodeId != null);
- // Need to unlock topology to avoid deadlock with binary descriptors registration.
- if (!topLocked && cctx.topology().holdsLock())
- cctx.topology().readUnlock();
-
for (Batch batch : next) {
ClusterNode node = batch.node();
@@ -534,13 +530,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
- processFailure(topEx, null);
-
- batches.remove(nodeId);
-
- if (batches.isEmpty()) // Wait for all pending requests.
- onDone();
-
+ onDone(topEx);
}
if (log.isDebugEnabled())
@@ -565,14 +555,8 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
if (res != null)
tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
- if (err != null)
- processFailure(err, null);
-
- if (ex != null) {
- batches.remove(nodeId);
-
- if (batches.isEmpty()) // Wait for all pending requests.
- onDone();
+ if (err != null) {
+ onDone(err);
return false;
}
@@ -583,7 +567,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
tx.hasRemoteLocks(true);
- return true;
+ return !isDone();
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index 27cb8f2..d4b4620 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.mvcc;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -40,6 +41,7 @@ import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -1756,6 +1758,100 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
* @throws Exception If failed.
*/
@Test
+ public void testUpdateExplicitPartitionsWithoutReducer() throws Exception {
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 10)
+ .setIndexedTypes(Integer.class, Integer.class);
+
+ Ignite ignite = startGridsMultiThreaded(4);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ Affinity<Object> affinity = internalCache0(cache).affinity();
+
+ int keysCnt = 10, retryCnt = 0;
+
+ Integer test = 0;
+
+ Map<Integer, Integer> vals = new LinkedHashMap<>();
+
+ while (vals.size() < keysCnt) {
+ int partition = affinity.partition(test);
+
+ if (partition == 1 || partition == 2)
+ vals.put(test, 0);
+ else
+ assertTrue("Maximum retry number exceeded", ++retryCnt < 1000);
+
+ test++;
+ }
+
+ cache.putAll(vals);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer set _val=2").setPartitions(1,2);
+
+ List<List<?>> all = cache.query(qry).getAll();
+
+ assertEquals(Long.valueOf(keysCnt), all.stream().findFirst().orElseThrow(AssertionError::new).get(0));
+
+ List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _val FROM Integer")).getAll();
+
+ assertEquals(keysCnt, rows.size());
+ assertTrue(rows.stream().map(r -> r.get(0)).map(Integer.class::cast).allMatch(v -> v == 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdateExplicitPartitionsWithReducer() throws Exception {
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 10)
+ .setIndexedTypes(Integer.class, Integer.class);
+
+ Ignite ignite = startGridsMultiThreaded(4);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ Affinity<Object> affinity = internalCache0(cache).affinity();
+
+ int keysCnt = 10, retryCnt = 0;
+
+ Integer test = 0;
+
+ Map<Integer, Integer> vals = new LinkedHashMap<>();
+
+ while (vals.size() < keysCnt) {
+ int partition = affinity.partition(test);
+
+ if (partition == 1 || partition == 2)
+ vals.put(test, 0);
+ else
+ assertTrue("Maximum retry number exceeded", ++retryCnt < 1000);
+
+ test++;
+ }
+
+ cache.putAll(vals);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer set _val=(SELECT 2 FROM DUAL)").setPartitions(1,2);
+
+ List<List<?>> all = cache.query(qry).getAll();
+
+ assertEquals(Long.valueOf(keysCnt), all.stream().findFirst().orElseThrow(AssertionError::new).get(0));
+
+ List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _val FROM Integer")).getAll();
+
+ assertEquals(keysCnt, rows.size());
+ assertTrue(rows.stream().map(r -> r.get(0)).map(Integer.class::cast).allMatch(v -> v == 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
public void testFastInsertUpdateConcurrent() throws Exception {
ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
.setIndexedTypes(Integer.class, Integer.class);