You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2018/11/21 12:18:01 UTC
ignite git commit: IGNITE-9928: MVCC TX: Late affinity assignment
support. This closes #5057.
Repository: ignite
Updated Branches:
refs/heads/master 529b6de6a -> a8fc7af01
IGNITE-9928: MVCC TX: Late affinity assignment support. This closes #5057.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8fc7af0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8fc7af0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8fc7af0
Branch: refs/heads/master
Commit: a8fc7af015febee09b70846277f1170b64bce74e
Parents: 529b6de
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Wed Nov 21 15:17:52 2018 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Wed Nov 21 15:17:52 2018 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManager.java | 5 +
.../cache/IgniteCacheOffheapManagerImpl.java | 18 ++
.../distributed/dht/GridDhtCacheEntry.java | 8 +-
.../dht/GridDhtTxAbstractEnlistFuture.java | 169 +++++++------
.../GridDhtPartitionsExchangeFuture.java | 33 ++-
.../dht/topology/GridDhtLocalPartition.java | 3 +-
.../near/GridNearTxEnlistFuture.java | 22 +-
.../GridNearTxQueryResultsEnlistFuture.java | 21 +-
.../persistence/GridCacheOffheapManager.java | 12 +
.../cache/persistence/tree/BPlusTree.java | 31 +++
.../cache/transactions/IgniteTxAdapter.java | 46 +++-
.../cache/transactions/IgniteTxHandler.java | 239 +++++++++++--------
.../cache/mvcc/CacheMvccAbstractTest.java | 19 +-
.../testframework/junits/GridAbstractTest.java | 2 +-
.../mvcc/CacheMvccBackupsAbstractTest.java | 2 +
...ccPartitionedSqlCoordinatorFailoverTest.java | 49 +++-
16 files changed, 439 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index db09a89..cdc44a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -675,6 +675,11 @@ public interface IgniteCacheOffheapManager {
long fullSize();
/**
+ * @return {@code True} if there are no items in the store.
+ */
+ boolean isEmpty();
+
+ /**
* Updates size metric for particular cache.
*
* @param cacheId Cache ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 6835795..e15009e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1533,6 +1533,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return storageSize.get();
}
+ /**
+ * @return {@code True} if there are no items in the store.
+ */
+ @Override public boolean isEmpty() {
+ try {
+ /*
+ * TODO https://issues.apache.org/jira/browse/IGNITE-10082
+ * Using of counters is cheaper than tree operations. Return size checking after the ticked is resolved.
+ */
+ return grp.mvccEnabled() ? dataTree.isEmpty() : storageSize.get() == 0;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to perform operation.", e);
+
+ return false;
+ }
+ }
+
/** {@inheritDoc} */
@Override public void updateSize(int cacheId, long delta) {
storageSize.addAndGet(delta);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index e2ad525..8eea5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -648,10 +648,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
']');
}
- if (cctx.mvccEnabled())
- cctx.offheap().mvccRemoveAll(this);
- else
- removeValue();
+ if (cctx.mvccEnabled())
+ cctx.offheap().mvccRemoveAll(this);
+ else
+ removeValue();
// Give to GC.
update(null, 0L, 0L, ver, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 9949930..62084dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -386,12 +385,20 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
try {
while (true) {
+ int curPart = -1;
+ List<ClusterNode> backups = null;
+
while (hasNext0()) {
Object cur = next0();
- KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+ KeyCacheObject key = toKey(op, cur);
+
+ if (curPart != key.partition())
+ backups = backupNodes(curPart = key.partition());
+
+ assert backups != null;
- if (!ensureFreeSlot(key)) {
+ if (!ensureFreeSlot(key, backups)) {
// Can't advance further at the moment.
peek = cur;
@@ -442,7 +449,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
cctx.localNodeId(),
topVer,
mvccSnapshot,
- isMoving(key.partition()),
+ isMoving(key.partition(), backups),
needOldVal,
filter,
needResult());
@@ -463,7 +470,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
topVer,
mvccSnapshot,
op.cacheOperation(),
- isMoving(key.partition()),
+ isMoving(key.partition(), backups),
op.noCreate(),
needOldVal,
filter,
@@ -502,13 +509,14 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
res = updateFut.get();
else {
GridDhtCacheEntry entry0 = entry;
+ List<ClusterNode> backups0 = backups;
it.beforeDetach();
updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() {
@Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) {
try {
- processEntry(entry0, op, fut.get(), val0);
+ processEntry(entry0, op, fut.get(), val0, backups0);
continueLoop(true);
}
@@ -523,7 +531,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
}
}
- processEntry(entry, op, res, val0);
+ processEntry(entry, op, res, val0, backups);
}
if (!hasNext0()) {
@@ -591,6 +599,16 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
return peek != FINISHED;
}
+ /** */
+ private KeyCacheObject toKey(EnlistOperation op, Object cur) {
+ KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+
+ if (key.partition() == -1)
+ key.partition(cctx.affinity().partition(key));
+
+ return key;
+ }
+
/**
* @return {@code True} if in-flight batches map is empty.
*/
@@ -611,41 +629,40 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @param op Operation.
* @param updRes Update result.
* @param val New value.
+ * @param backups Backup nodes
* @throws IgniteCheckedException If failed.
*/
private void processEntry(GridDhtCacheEntry entry, EnlistOperation op,
- GridCacheUpdateTxResult updRes, Message val) throws IgniteCheckedException {
+ GridCacheUpdateTxResult updRes, Message val, List<ClusterNode> backups) throws IgniteCheckedException {
checkCompleted();
assert updRes != null && updRes.updateFuture() == null;
onEntryProcessed(entry.key(), updRes);
- if (!updRes.success())
+ if (!updRes.success()
+ || updRes.filtered()
+ || op == EnlistOperation.LOCK)
return;
- if (!updRes.filtered())
- cctx.shared().mvccCaching().addEnlisted(entry.key(), updRes.newValue(), 0, 0, lockVer,
- updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
+ cctx.shared().mvccCaching().addEnlisted(entry.key(), updRes.newValue(), 0, 0, lockVer,
+ updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
- if (op != EnlistOperation.LOCK)
- addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), op);
+ addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), backups);
}
/**
* Adds row to batch.
* <b>IMPORTANT:</b> This method should be called from the critical section in {@link this.sendNextBatches()}
- *
* @param key Key.
* @param val Value.
* @param hist History rows.
* @param cacheId Cache Id.
+ * @param backups Backup nodes
*/
private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
- int cacheId, EnlistOperation op) throws IgniteCheckedException {
- List<ClusterNode> backups = backupNodes(key);
-
- int part = cctx.affinity().partition(key);
+ int cacheId, List<ClusterNode> backups) throws IgniteCheckedException {
+ int part = key.partition();
tx.touchPartition(cacheId, part);
@@ -659,7 +676,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
boolean moving = isMoving(node, part);
- if (skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving) {
+ if (skipNearLocalUpdate(node, moving)) {
updateMappings(node);
if (newRemoteTx(node))
@@ -680,15 +697,13 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
if (batch == null)
batches.put(node.id(), batch = new Batch(node));
- if (moving && hist0 == null && !op.isInvoke()) {
+ if (moving && hist0 == null) {
assert !F.isEmpty(hist);
hist0 = fetchHistoryInfo(key, hist);
}
- Message m = moving && !op.isInvoke() ? hist0 : val;
-
- batch.add(key, m);
+ batch.add(key, moving ? hist0 : val);
if (batch.size() == BATCH_SIZE) {
assert batches != null;
@@ -770,16 +785,19 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* Checks if there free space in batches or free slot in in-flight batches is available for the given key.
*
* @param key Key.
+ * @param backups Backup nodes.
* @return {@code True} if there is possible to add this key to batch or send ready batch.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private boolean ensureFreeSlot(KeyCacheObject key) {
+ private boolean ensureFreeSlot(KeyCacheObject key, List<ClusterNode> backups) {
if (F.isEmpty(batches) || F.isEmpty(pending))
return true;
+ int part = key.partition();
+
// Check possibility of adding to batch and sending.
- for (ClusterNode node : backupNodes(key)) {
- if (skipNearNodeUpdates && node.id().equals(nearNodeId) && !isMoving(node, key.partition()))
+ for (ClusterNode node : backups) {
+ if (skipNearLocalUpdate(node, isMoving(node, part)))
continue;
Batch batch = batches.get(node.id());
@@ -816,16 +834,14 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
if (newRemoteTx(node))
addNewRemoteTxNode(node);
- if (!firstReqSent.contains(node)) {
- firstReqSent.add(node);
-
+ if (firstReqSent.add(node)) {
// If this is a first request to this node, send full info.
req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(),
futId,
cctx.localNodeId(),
tx.topologyVersionSnapshot(),
lockVer,
- mvccSnapshot,
+ mvccSnapshot.withoutActiveTransactions(),
tx.remainingTime(),
tx.taskNameHash(),
nearNodeId,
@@ -863,7 +879,13 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
assert prev == null;
- cctx.io().send(node, req, cctx.ioPolicy());
+ try {
+ cctx.io().send(node, req, cctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ // backup node left the grid, will continue.
+ onNodeLeft(node.id());
+ }
}
/** */
@@ -880,20 +902,21 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
mapping.markQueryUpdate();
}
+ /** */
+ private boolean skipNearLocalUpdate(ClusterNode node, boolean moving) {
+ return skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving;
+ }
+
/**
- * @param key Key.
- * @return Backup nodes for the given key.
+ * @param part Partition.
+ * @return Backup nodes for the given partition.
*/
- @NotNull private List<ClusterNode> backupNodes(KeyCacheObject key) {
- List<ClusterNode> dhtNodes = cctx.affinity().nodesByKey(key, tx.topologyVersion());
-
- assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) :
- "localNode = " + cctx.localNodeId() + ", dhtNodes = " + dhtNodes;
+ @NotNull private List<ClusterNode> backupNodes(int part) {
+ List<ClusterNode> nodes = cctx.topology().nodes(part, tx.topologyVersion());
- if (dhtNodes.size() == 1)
- return Collections.emptyList();
+ assert nodes.size() > 0 && nodes.get(0).isLocal();
- return dhtNodes.subList(1, dhtNodes.size());
+ return nodes.subList(1, nodes.size());
}
/**
@@ -933,31 +956,33 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
/**
* @param part Partition.
+ * @param backups Backup nodes.
* @return {@code true} if the given partition is rebalancing to any backup node.
*/
- private boolean isMoving(int part) {
+ private boolean isMoving(int part, List<ClusterNode> backups) {
+ Boolean res;
+
if (movingParts == null)
movingParts = new HashMap<>();
- Boolean res = movingParts.get(part);
+ if ((res = movingParts.get(part)) == null)
+ movingParts.put(part, res = isMoving0(part, backups));
- if (res != null)
- return res;
-
- List<ClusterNode> dhtNodes = cctx.affinity().nodesByPartition(part, tx.topologyVersion());
-
- for (int i = 1; i < dhtNodes.size(); i++) {
- ClusterNode node = dhtNodes.get(i);
- if (isMoving(node, part)) {
- movingParts.put(part, Boolean.TRUE);
+ return res == Boolean.TRUE;
+ }
- return true;
- }
+ /**
+ * @param part Partition.
+ * @param backups Backup nodes.
+ * @return {@code true} if the given partition is rebalancing to any backup node.
+ */
+ private Boolean isMoving0(int part, List<ClusterNode> backups) {
+ for (ClusterNode node : backups) {
+ if (isMoving(node, part))
+ return Boolean.TRUE;
}
- movingParts.put(part, Boolean.FALSE);
-
- return false;
+ return Boolean.FALSE;
}
/**
@@ -966,9 +991,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @return {@code true} if the given partition is rebalancing to the given node.
*/
private boolean isMoving(ClusterNode node, int part) {
- GridDhtPartitionState partState = cctx.topology().partitionState(node.id(), part);
-
- return partState != GridDhtPartitionState.OWNING && partState != GridDhtPartitionState.EVICTED;
+ return cctx.topology().partitionState(node.id(), part) == GridDhtPartitionState.MOVING;
}
/** */
@@ -1021,23 +1044,17 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
- boolean backupLeft = false;
-
- Set<ClusterNode> nodes = tx.lockTransactionNodes();
-
- if (!F.isEmpty(nodes)) {
- for (ClusterNode node : nodes) {
- if (node.id().equals(nodeId)) {
- backupLeft = true;
-
- break;
- }
- }
+ try {
+ if (nearNodeId.equals(nodeId))
+ onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']'));
+ else if (pending != null && pending.remove(nodeId) != null)
+ cctx.kernalContext().closure().runLocalSafe(() -> continueLoop(false));
+ }
+ catch (Exception e) {
+ onDone(e);
}
- return (backupLeft || nearNodeId.equals(nodeId)) && onDone(
- new ClusterTopologyCheckedException((backupLeft ? "Backup" : "Requesting") +
- " node left the grid [nodeId=" + nodeId + ']'));
+ return false;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 86000cd..e9d9442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -593,7 +593,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return First event discovery event.1
+ * @return First event discovery event.
*
*/
public DiscoveryEvent firstEvent() {
@@ -3566,27 +3566,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* failed to send update counter deltas to backup.
*/
private void finalizePartitionCounters() {
- long time = System.currentTimeMillis();
+ int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
- try {
- int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
+ // Reserve at least 2 threads for system operations.
+ parallelismLvl = Math.max(1, parallelismLvl - 2);
- // Reserve at least 2 threads for system operations.
- parallelismLvl = Math.max(1, parallelismLvl - 2);
+ long time = System.currentTimeMillis();
- if (parallelismLvl > 1) {
- U.doInParallel(parallelismLvl,
- cctx.kernalContext().getSystemExecutorService(),
- nonLocalCacheGroups(),
- grp -> {
- grp.topology().finalizeUpdateCounters();
+ try {
+ U.<CacheGroupContext, Void>doInParallel(
+ parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ nonLocalCacheGroups(),
+ grp -> {
+ grp.topology().finalizeUpdateCounters();
- return null;
- }
- );
- }
- else
- nonLocalCacheGroups().forEach(grp -> grp.topology().finalizeUpdateCounters());
+ return null;
+ }
+ );
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to finalize partition counters", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index f3516a6..833a2b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -350,7 +351,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if partition is empty.
*/
public boolean isEmpty() {
- return store.fullSize() == 0 && internalSize() == 0;
+ return store.isEmpty() && internalSize() == 0;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 2a7c2e4..35a8cfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -209,11 +209,9 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
- List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+ ClusterNode node = cctx.affinity().primaryByKey(key, topVer);
- ClusterNode node;
-
- if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+ if (node == null)
throw new ClusterTopologyCheckedException("Failed to get primary node " +
"[topVer=" + topVer + ", key=" + key + ']');
@@ -240,8 +238,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
break;
}
- batch.add(op.isDeleteOrLock() ? key : cur,
- op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+ batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
if (batch.size() == batchSize)
res = markReady(res, batch);
@@ -297,6 +294,16 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
return peek != FINISHED;
}
+ /** */
+ private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
+ if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
+ return false;
+ else if (cctx.isReplicated())
+ return true;
+
+ return cctx.topology().nodes(key.partition(), tx.topologyVersion()).indexOf(cctx.localNode()) > 0;
+ }
+
/**
* Add batch to batch collection if it is ready.
*
@@ -604,6 +611,9 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
assert this.res != null && (this.res.emptyResult() || needRes || this.res.invokeResult() || !this.res.success());
+
+ tx.hasRemoteLocks(true);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index d6deed1..316aae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -200,11 +200,9 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
- List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+ ClusterNode node = cctx.affinity().primaryByPartition(key.partition(), topVer);
- ClusterNode node;
-
- if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+ if (node == null)
throw new ClusterTopologyCheckedException("Failed to get primary node " +
"[topVer=" + topVer + ", key=" + key + ']');
@@ -229,8 +227,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
break;
}
- batch.add(op.isDeleteOrLock() ? key : cur,
- op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+ batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
if (batch.size() == batchSize)
res = markReady(res, batch);
@@ -287,6 +284,16 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
}
/** */
+ private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
+ if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
+ return false;
+ else if (cctx.isReplicated())
+ return true;
+
+ return cctx.topology().nodes(key.partition(), tx.topologyVersion()).contains(cctx.localNode());
+ }
+
+ /** */
private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
if (!batch.ready()) {
batch.ready(true);
@@ -571,6 +578,8 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
RES_UPD.getAndAdd(this, res.result());
+ tx.hasRemoteLocks(true);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 73cb878..fb287bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1770,6 +1770,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public boolean isEmpty() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null || delegate0.isEmpty();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long cacheSize(int cacheId) {
try {
CacheDataStore delegate0 = init0(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 17799d3..7852008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -2098,6 +2098,37 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/**
+ * @return {@code True} in case the tree is empty.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final boolean isEmpty() throws IgniteCheckedException {
+ checkDestroyed();
+
+ for (;;) {
+ TreeMetaData treeMeta = treeMeta();
+
+ long rootId, rootPage = acquirePage(rootId = treeMeta.rootId);
+
+ long rootAddr = readLock(rootId, rootPage);
+
+ if (rootAddr == 0) {
+ checkDestroyed();
+
+ continue;
+ }
+
+ try {
+ BPlusIO<L> io = io(rootAddr);
+
+ return io.getCount(rootAddr) == 0;
+ }
+ finally {
+ readUnlock(rootId, rootPage, rootAddr);
+ }
+ }
+ }
+
+ /**
* Returns number of elements in the tree by scanning pages of the bottom (leaf) level.
* Since a concurrent access is permitted, there is no guarantee about
* momentary consistency: the method may miss updates made in already scanned pages.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 52659e3..ec1b646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -62,7 +62,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -1983,21 +1985,51 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
for (Map.Entry<Integer, ? extends Map<Integer, AtomicLong>> entry : sizeDeltas.entrySet()) {
Integer cacheId = entry.getKey();
- Map<Integer, AtomicLong> partDeltas = entry.getValue();
+ Map<Integer, AtomicLong> deltas = entry.getValue();
- assert !F.isEmpty(partDeltas);
+ assert !F.isEmpty(deltas);
GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology();
- for (Map.Entry<Integer, AtomicLong> e : partDeltas.entrySet()) {
- Integer p = e.getKey();
+ // Need to reserve on backups only
+ boolean reserve = dht() && remote();
+
+ for (Map.Entry<Integer, AtomicLong> e : deltas.entrySet()) {
+ boolean invalid = false;
+ int p = e.getKey();
long delta = e.getValue().get();
- GridDhtLocalPartition dhtPart = top.localPartition(p);
+ try {
+ GridDhtLocalPartition part = top.localPartition(p);
+
+ if (!reserve || part != null && part.reserve()) {
+ assert part != null;
+
+ try {
+ if (part.state() != GridDhtPartitionState.RENTING)
+ part.dataStore().updateSize(cacheId, delta);
+ else
+ invalid = true;
+ }
+ finally {
+ if (reserve)
+ part.release();
+ }
+ }
+ else
+ invalid = true;
+ }
+ catch (GridDhtInvalidPartitionException e1) {
+ invalid = true;
+ }
- assert dhtPart != null;
+ if (invalid) {
+ assert reserve;
- dhtPart.dataStore().updateSize(cacheId, delta);
+ if (log.isDebugEnabled())
+ log.debug("Trying to apply size delta for invalid partition: " +
+ "[cacheId=" + cacheId + ", part=" + p + "]");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 3685f7a..aefd54c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,7 +25,6 @@ import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -34,7 +33,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -64,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeVal
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
@@ -1664,8 +1663,6 @@ public class IgniteTxHandler {
GridDhtTxRemote tx = ctx.tm().tx(req.version());
if (tx == null) {
- assert !req.queryUpdate();
-
boolean single = req.last() && req.writes().size() == 1;
tx = new GridDhtTxRemote(
@@ -1866,117 +1863,132 @@ public class IgniteTxHandler {
int part = ctx.affinity().partition(key);
- GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
+ try {
+ GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
- if (locPart == null || !locPart.reserve())
- throw new ClusterTopologyException("Can not reserve partition. Please retry on stable topology.");
+ if (locPart != null && locPart.reserve()) {
+ try {
+ // do not process renting partitions.
+ if (locPart.state() == GridDhtPartitionState.RENTING) {
+ tx.addInvalidPartition(ctx, part);
- try {
- CacheObject val = null;
- EntryProcessor entryProc = null;
- Object[] invokeArgs = null;
+ continue;
+ }
- boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
+ CacheObject val = null;
+ EntryProcessor entryProc = null;
+ Object[] invokeArgs = null;
- Message val0 = vals != null ? vals.get(i) : null;
+ boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
- CacheEntryInfoCollection entries =
- val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
+ Message val0 = vals != null ? vals.get(i) : null;
- if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
- val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
+ CacheEntryInfoCollection entries =
+ val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
- if(entries == null && op.isInvoke()) {
- assert val0 instanceof GridInvokeValue;
+ if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
+ val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
- GridInvokeValue invokeVal = (GridInvokeValue)val0;
+ if (entries == null && op.isInvoke()) {
+ assert val0 instanceof GridInvokeValue;
- entryProc = invokeVal.entryProcessor();
- invokeArgs = invokeVal.invokeArgs();
- }
+ GridInvokeValue invokeVal = (GridInvokeValue)val0;
- assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
+ entryProc = invokeVal.entryProcessor();
+ invokeArgs = invokeVal.invokeArgs();
+ }
- GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
+ assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
- GridCacheUpdateTxResult updRes;
+ GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
- while (true) {
- ctx.shared().database().checkpointReadLock();
+ GridCacheUpdateTxResult updRes;
- try {
- if (entries == null) {
- switch (op) {
- case DELETE:
- updRes = entry.mvccRemove(
- tx,
- ctx.localNodeId(),
- tx.topologyVersion(),
- snapshot,
- false,
- needOldVal,
- null,
- false);
-
- break;
-
- case INSERT:
- case TRANSFORM:
- case UPSERT:
- case UPDATE:
- updRes = entry.mvccSet(
- tx,
+ while (true) {
+ ctx.shared().database().checkpointReadLock();
+
+ try {
+ if (entries == null) {
+ switch (op) {
+ case DELETE:
+ updRes = entry.mvccRemove(
+ tx,
+ ctx.localNodeId(),
+ tx.topologyVersion(),
+ snapshot,
+ false,
+ needOldVal,
+ null,
+ false);
+
+ break;
+
+ case INSERT:
+ case TRANSFORM:
+ case UPSERT:
+ case UPDATE:
+ updRes = entry.mvccSet(
+ tx,
+ ctx.localNodeId(),
+ val,
+ entryProc,
+ invokeArgs,
+ 0,
+ tx.topologyVersion(),
+ snapshot,
+ op.cacheOperation(),
+ false,
+ false,
+ needOldVal,
+ null,
+ false);
+
+ break;
+
+ default:
+ throw new IgniteSQLException("Cannot acquire lock for operation [op= "
+ + op + "]" + "Operation is unsupported at the moment ",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+ }
+ else {
+ updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
ctx.localNodeId(),
- val,
- entryProc,
- invokeArgs,
- 0,
tx.topologyVersion(),
- snapshot,
+ entries.infos(),
op.cacheOperation(),
- false,
- false,
- needOldVal,
- null,
- false);
-
- break;
-
- default:
- throw new IgniteSQLException("Cannot acquire lock for operation [op= "
- + op + "]" + "Operation is unsupported at the moment ",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ snapshot,
+ futId,
+ batchNum);
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ entry = dht.entryExx(key);
+ }
+ finally {
+ ctx.shared().database().checkpointReadUnlock();
}
- }
- else {
- updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
- ctx.localNodeId(),
- tx.topologyVersion(),
- entries.infos(),
- op.cacheOperation(),
- snapshot,
- futId,
- batchNum);
}
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- entry = dht.entryExx(key);
+ if (!updRes.filtered())
+ ctx.shared().mvccCaching().addEnlisted(key, updRes.newValue(), 0, 0, tx.xidVersion(),
+ updRes.oldValue(), tx.local(), tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
+
+ assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
}
+
finally {
- ctx.shared().database().checkpointReadUnlock();
+ locPart.release();
}
}
-
- if (!updRes.filtered())
- ctx.shared().mvccCaching().addEnlisted(key, updRes.newValue(), 0, 0, tx.xidVersion(),
- updRes.oldValue(), tx.local(), tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
-
- assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
+ else {
+ tx.addInvalidPartition(ctx, part);
+ }
}
- finally {
- locPart.release();
+ catch (GridDhtInvalidPartitionException e) {
+ tx.addInvalidPartition(ctx, e.partition());
}
}
}
@@ -2218,26 +2230,42 @@ public class IgniteTxHandler {
if (counters == null)
return;
- int cacheId = CU.UNDEFINED_CACHE_ID;
- GridDhtPartitionTopology top = null;
-
for (PartitionUpdateCountersMessage counter : counters) {
- if (counter.cacheId() != cacheId) {
- GridCacheContext ctx0 = ctx.cacheContext(cacheId = counter.cacheId());
+ GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
- assert ctx0.mvccEnabled();
+ assert ctx0.mvccEnabled();
- top = ctx0.topology();
- }
+ GridDhtPartitionTopology top = ctx0.topology();
assert top != null;
for (int i = 0; i < counter.size(); i++) {
- GridDhtLocalPartition part = top.localPartition(counter.partition(i));
+ boolean invalid = false;
+
+ try {
+ GridDhtLocalPartition part = top.localPartition(counter.partition(i));
- assert part != null;
+ if (part != null && part.reserve()) {
+ try {
+ if (part.state() != GridDhtPartitionState.RENTING)
+ part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+ else
+ invalid = true;
+ }
+ finally {
+ part.release();
+ }
+ }
+ else
+ invalid = true;
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ invalid = true;
+ }
- part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+ if (invalid && log.isDebugEnabled())
+ log.debug("Received partition update counters message for invalid partition: " +
+ "[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + "]");
}
}
}
@@ -2250,25 +2278,24 @@ public class IgniteTxHandler {
@Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(
IgniteInternalTx tx, ClusterNode node) {
TxCounters txCntrs = tx.txCounters(false);
+ Collection<PartitionUpdateCountersMessage> updCntrs;
- if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
+ if (txCntrs == null || F.isEmpty(updCntrs = txCntrs.updateCounters()))
return null;
- Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
-
List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
AffinityTopologyVersion top = tx.topologyVersionSnapshot();
for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
- GridCacheAffinityManager affinity = ctx.cacheContext(partCntrs.cacheId()).affinity();
+ GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology();
PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
for (int i = 0; i < partCntrs.size(); i++) {
int part = partCntrs.partition(i);
- if (affinity.backupByPartition(node, part, top))
+ if (topology.nodes(part, top).indexOf(node) > 0)
resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 7869e13..4283d1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -944,6 +944,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
tx.commit();
+ v++;
+
first = false;
}
@@ -951,10 +953,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
for (Integer k : keys)
- assertEquals("key=" + k, v, (Object)res.get(k));
+ assertEquals("key=" + k, v - 1, (Object)res.get(k));
}
-
- v++;
}
catch (Exception e) {
handleTxException(e);
@@ -1366,6 +1366,13 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
}, readers, "reader");
+ GridTestUtils.runAsync(() -> {
+ while (System.currentTimeMillis() < stopTime)
+ doSleep(1000);
+
+ stop.set(true);
+ });
+
while (System.currentTimeMillis() < stopTime && !stop.get()) {
Thread.sleep(1000);
@@ -1407,8 +1414,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
Ignite srv = startGrid(idx);
+ cache0 = new TestCache(srv.cache(DEFAULT_CACHE_NAME));
+
synchronized (caches) {
- caches.set(idx, new TestCache(srv.cache(DEFAULT_CACHE_NAME)));
+ caches.set(idx, cache0);
}
awaitPartitionMapExchange();
@@ -1422,8 +1431,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
}
- stop.set(true);
-
Exception ex = null;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 817c56b..1f42694 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -2344,7 +2344,7 @@ public abstract class GridAbstractTest extends TestCase {
U.sleep(millis);
}
catch (Exception e) {
- throw new IgniteException();
+ throw new IgniteException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
index 998cb76..894e4bb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
@@ -182,6 +182,8 @@ public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest
* @throws Exception If failed.
*/
public void testBackupsCoherenceWithLargeOperations() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10104");
+
disableScheduledVacuum = true;
ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, 10)
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
index b347497..eee270c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.CacheMode;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -64,14 +63,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- public void testPutAllGetAll_ClientServer_Backups1_Restart_Scan() throws Exception {
- // TODO add tests with RESTART_RND_SRV https://issues.apache.org/jira/browse/IGNITE-9928
- putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 2, 64, /*new InitIndexing(Integer.class, Integer.class)*/ null, SCAN, PUT);
- }
-
- /**
- * @throws Exception If failed.
- */
public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_ScanDml_Persistence() throws Exception {
persistence = true;
@@ -100,6 +91,46 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
+ public void testPutAllGetAll_ClientServer_Backups1_RestartRandomSrv_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_RND_SRV, 3, 1, 1, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups2_RestartRandomSrv_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 1, 2, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_Server_Backups2_RestartRandomSrv_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 0, 2, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_Server_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 0, 1, 1,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_RND_SRV, 3, 1, 1, 1,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testUpdate_N_Objects_ClientServer_Backups2_Sql() throws Exception {
updateNObjectsTest(7, 3, 2, 2, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);