You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/22 15:16:54 UTC
[02/14] ignite git commit: IGNITE-5935: MVCC TX: Tx recovery protocol
IGNITE-5935: MVCC TX: Tx recovery protocol
This closes #4920
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5939a947
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5939a947
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5939a947
Branch: refs/heads/ignite-9720
Commit: 5939a94763c8a3e92b66b3f591a816dd6c49f35a
Parents: 82d2efe
Author: ipavlukhin <vo...@gmail.com>
Authored: Fri Oct 19 17:40:12 2018 +0300
Committer: Igor Sapego <is...@apache.org>
Committed: Fri Oct 19 17:40:12 2018 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 6 +-
.../communication/GridIoMessageFactory.java | 18 +
.../GridCachePartitionExchangeManager.java | 4 +-
.../cache/IgniteCacheOffheapManager.java | 6 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 5 +
.../cache/PartitionUpdateCounter.java | 30 +-
.../distributed/GridCacheTxRecoveryFuture.java | 11 -
.../GridDistributedTxRemoteAdapter.java | 52 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 38 --
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
.../topology/GridClientPartitionTopology.java | 3 +-
.../dht/topology/GridDhtLocalPartition.java | 7 +
.../dht/topology/GridDhtPartitionTopology.java | 5 +-
.../topology/GridDhtPartitionTopologyImpl.java | 6 +-
.../processors/cache/mvcc/MvccProcessor.java | 7 -
.../cache/mvcc/MvccProcessorImpl.java | 217 +++++-
.../processors/cache/mvcc/MvccUtils.java | 6 +-
.../mvcc/msg/MvccRecoveryFinishedMessage.java | 116 ++++
.../PartitionCountersNeighborcastRequest.java | 145 ++++
.../PartitionCountersNeighborcastResponse.java | 114 ++++
.../persistence/GridCacheOffheapManager.java | 13 +
.../cache/transactions/IgniteTxAdapter.java | 3 +-
.../cache/transactions/IgniteTxHandler.java | 125 ++++
.../cache/transactions/IgniteTxManager.java | 123 +++-
.../PartitionCountersNeighborcastFuture.java | 211 ++++++
.../cache/transactions/TxCounters.java | 3 +-
.../continuous/GridContinuousProcessor.java | 6 +-
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...cOriginatingNodeFailureAbstractSelfTest.java | 69 +-
...itionedTxOriginatingNodeFailureSelfTest.java | 2 +-
...woBackupsPrimaryNodeFailureRecoveryTest.java | 2 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 133 +++-
...licatedTxOriginatingNodeFailureSelfTest.java | 2 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 4 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 2 +-
.../cache/mvcc/CacheMvccTxRecoveryTest.java | 654 +++++++++++++++++++
...GridIndexRebuildWithMvccEnabledSelfTest.java | 3 +-
.../testsuites/IgniteCacheMvccSqlTestSuite.java | 58 ++
40 files changed, 1980 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 2599d7a..7492e51 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -43,9 +43,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -171,7 +169,7 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
- gen.generateAndWrite(GridNearTxEnlistResponse.class);
+ gen.generateAndWrite(MvccRecoveryFinishedMessage.class);
// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e405d7d..3f4eb18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -134,9 +134,12 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQ
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -1096,6 +1099,21 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 164:
+ msg = new MvccRecoveryFinishedMessage();
+
+ break;
+
+ case 165:
+ msg = new PartitionCountersNeighborcastRequest();
+
+ break;
+
+ case 166:
+ msg = new PartitionCountersNeighborcastResponse();
+
+ break;
+
// [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6af9678..0b8dd75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1405,7 +1405,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
grp.affinity().similarAffinityKey());
if (sndCounters) {
- CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
+ CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true, true);
m.addPartitionUpdateCounters(grp.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
@@ -1429,7 +1429,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top.similarAffinityKey());
if (sndCounters) {
- CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
+ CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true, true);
m.addPartitionUpdateCounters(top.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 044830c..e9ec025 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1067,8 +1067,12 @@ public interface IgniteCacheOffheapManager {
* Return PendingTree for data store.
*
* @return PendingTree instance.
- * @throws IgniteCheckedException
*/
PendingEntriesTree pendingTree();
+
+ /**
+ * Flushes pending update counters closing all possible gaps.
+ */
+ void finalizeUpdateCountres();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e40cc53..e547784 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1556,6 +1556,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public void finalizeUpdateCountres() {
+ pCntr.finalizeUpdateCountres();
+ }
+
+ /** {@inheritDoc} */
@Override public String name() {
return name;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index b5960ab..fe44708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
import org.jetbrains.annotations.NotNull;
@@ -31,7 +30,7 @@ public class PartitionUpdateCounter {
private IgniteLogger log;
/** Queue of counter update tasks*/
- private final Queue<Item> queue = new PriorityQueue<>();
+ private final TreeSet<Item> queue = new TreeSet<>();
/** Counter. */
private final AtomicLong cntr = new AtomicLong();
@@ -161,21 +160,34 @@ public class PartitionUpdateCounter {
* @return Retrieves the minimum update counter task from queue.
*/
private Item poll() {
- return queue.poll();
+ return queue.pollFirst();
}
/**
* @return Checks the minimum update counter task from queue.
*/
private Item peek() {
- return queue.peek();
+ return queue.isEmpty() ? null : queue.first();
+
}
/**
* @param item Adds update task to priority queue.
*/
private void offer(Item item) {
- queue.offer(item);
+ queue.add(item);
+ }
+
+ /**
+ * Flushes pending update counters closing all possible gaps.
+ */
+ public synchronized void finalizeUpdateCountres() {
+ Item last = queue.pollLast();
+
+ if (last != null)
+ update(last.start + last.delta);
+
+ queue.clear();
}
/**
@@ -199,11 +211,7 @@ public class PartitionUpdateCounter {
/** {@inheritDoc} */
@Override public int compareTo(@NotNull Item o) {
- int cmp = Long.compare(this.start, o.start);
-
- assert cmp != 0;
-
- return cmp;
+ return Long.compare(this.start, o.start);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 3fb1e4f..5e0deb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -146,17 +146,6 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
*/
@SuppressWarnings("ConstantConditions")
public void prepare() {
- if (tx.txState().mvccEnabled()) { // TODO IGNITE-5935
- U.error(log, "Cannot commit MVCC enabled transaction by recovery procedure. " +
- "Operation is usupported at the moment [tx=" + CU.txString(tx) + ']');
-
- onDone(false);
-
- markInitialized();
-
- return;
- }
-
if (nearTxCheck) {
UUID nearNodeId = tx.eventNodeId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4db4685..3cabaec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -50,9 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -770,15 +767,15 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Apply update counters.
if (txCntrs != null)
- applyPartitionsUpdatesCounters(txCntrs.updateCounters());
+ cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCntrs.updateCounters());
- cctx.mvccCaching().onTxFinished(this, true);
+ cctx.mvccCaching().onTxFinished(this, true);
- if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
- // Set new update counters for data entries received from persisted tx entries.
- List<DataEntry> entriesWithCounters = dataEntries.stream()
- .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
- .collect(Collectors.toList());
+ if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
+ // Set new update counters for data entries received from persisted tx entries.
+ List<DataEntry> entriesWithCounters = dataEntries.stream()
+ .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
+ .collect(Collectors.toList());
cctx.wal().log(new DataRecord(entriesWithCounters));
}
@@ -921,7 +918,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
TxCounters counters = txCounters(false);
if (counters != null)
- applyPartitionsUpdatesCounters(counters.updateCounters());
+ cctx.tm().txHandler().applyPartitionsUpdatesCounters(counters.updateCounters());
state(ROLLED_BACK);
@@ -996,39 +993,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
- /**
- * Applies partition counters updates for mvcc transactions.
- *
- * @param counters Counters values to be updated.
- */
- private void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) {
- if (counters == null)
- return;
-
- int cacheId = CU.UNDEFINED_CACHE_ID;
- GridDhtPartitionTopology top = null;
-
- for (PartitionUpdateCountersMessage counter : counters) {
- if (counter.cacheId() != cacheId) {
- GridCacheContext ctx0 = cctx.cacheContext(cacheId = counter.cacheId());
-
- assert ctx0.mvccEnabled();
-
- top = ctx0.topology();
- }
-
- assert top != null;
-
- for (int i = 0; i < counter.size(); i++) {
- GridDhtLocalPartition part = top.localPartition(counter.partition(i));
-
- assert part != null;
-
- part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
- }
- }
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 9f96b46..d0fbd90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -375,7 +375,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
false,
false,
tx.mvccSnapshot(),
- tx.filterUpdateCountersForBackupNode(n));
+ cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
try {
cctx.io().send(n, req, tx.ioPolicy());
@@ -488,7 +488,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
false,
false,
mvccSnapshot,
- commit ? null : tx.filterUpdateCountersForBackupNode(n));
+ commit ? null : cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 483990f..86f9c3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -34,7 +33,6 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanMap;
@@ -944,41 +941,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
return prepFut;
}
- /**
- * @param node Backup node.
- * @return Partition counters map for the given backup node.
- */
- public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(ClusterNode node) {
- TxCounters txCntrs = txCounters(false);
-
- if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
- return null;
-
- Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
-
- List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
-
- AffinityTopologyVersion top = topologyVersionSnapshot();
-
- for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
- GridCacheAffinityManager affinity = cctx.cacheContext(partCntrs.cacheId()).affinity();
-
- PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
-
- for (int i = 0; i < partCntrs.size(); i++) {
- int part = partCntrs.partition(i);
-
- if (affinity.backupByPartition(node, part, top))
- resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
- }
-
- if (resCntrs.size() > 0)
- res.add(resCntrs);
- }
-
- return res;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c505677..609bff8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1398,7 +1398,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
tx.storeWriteThrough(),
retVal,
mvccSnapshot,
- tx.filterUpdateCountersForBackupNode(n));
+ cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
req.queryUpdate(dhtMapping.queryUpdate());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 9140322..cd6e254 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1234,7 +1234,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+ boolean finalizeCntrsBeforeCollecting) {
return CachePartitionPartialCountersMap.EMPTY;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 253a56a..2ddc0d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -1371,6 +1371,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * Flushes pending update counters closing all possible gaps.
+ */
+ public void finalizeUpdateCountres() {
+ store.finalizeUpdateCountres();
+ }
+
+ /**
* Removed entry holder.
*/
private static class RemovedEntryHolder {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index b6cb5bb..25b284e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -350,9 +350,12 @@ public interface GridDhtPartitionTopology {
public CachePartitionFullCountersMap fullUpdateCounters();
/**
+ * @param skipZeros {@code True} for adding zero counter to map.
+ * @param finalizeCntrsBeforeCollecting {@code True} indicates that partition counters should be finalized.
* @return Partition update counters.
*/
- public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
+ public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+ boolean finalizeCntrsBeforeCollecting);
/**
* @return Partition cache sizes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 94bb7f1..1f338d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -2657,7 +2657,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+ boolean finalizeCntrsBeforeCollecting) {
lock.readLock().lock();
try {
@@ -2678,6 +2679,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
+ if (finalizeCntrsBeforeCollecting)
+ part.finalizeUpdateCountres();
+
long updCntr = part.updateCounter();
long initCntr = part.initialUpdateCounter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index a09468f..a926acf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -180,13 +180,6 @@ public interface MvccProcessor extends GridProcessor {
/**
* Requests snapshot on Mvcc coordinator.
*
- * @return Snapshot future.
- */
- IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync();
-
- /**
- * Requests snapshot on Mvcc coordinator.
- *
* @param tx Transaction.
* @return Snapshot future.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index f17c137..9fcafb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -20,14 +20,17 @@ package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -68,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMes
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
@@ -189,8 +193,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** */
private final GridAtomicLong committedCntr = new GridAtomicLong(MVCC_INITIAL_CNTR);
- /** */
- private final Map<Long, Long> activeTxs = new HashMap<>();
+ /**
+ * Contains active transactions on mvcc coordinator. Key is mvcc counter.
+ * Access is protected by "this" monitor.
+ */
+ private final Map<Long, ActiveTx> activeTxs = new HashMap<>();
/** Active query trackers. */
private final Map<Long, MvccQueryTracker> activeTrackers = new ConcurrentHashMap<>();
@@ -223,6 +230,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
private volatile boolean mvccSupported = true;
/**
+ * Maps failed node id to votes accumulator for that node.
+ */
+ private final ConcurrentHashMap<UUID, RecoveryBallotBox> recoveryBallotBoxes = new ConcurrentHashMap<>();
+
+ /**
* @param ctx Context.
*/
public MvccProcessorImpl(GridKernalContext ctx) {
@@ -363,8 +375,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void onExchangeDone(boolean newCrd, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) {
- if (!newCrd)
+ if (!newCrd) {
+ if (curCrd != null && ctx.localNodeId().equals(curCrd.nodeId()) && discoCache != null)
+ cleanupOrphanedServerTransactions(discoCache.serverNodes());
+
return;
+ }
ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
@@ -391,6 +407,33 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
+ /**
+ * Cleans up active transacitons lost near node which is server. Executed on coordinator.
+ * @param liveSrvs Live server nodes at the moment of cleanup.
+ */
+ private void cleanupOrphanedServerTransactions(Collection<ClusterNode> liveSrvs) {
+ Set<UUID> ids = liveSrvs.stream()
+ .map(ClusterNode::id)
+ .collect(Collectors.toSet());
+
+ List<Long> forRmv = new ArrayList<>();
+
+ synchronized (this) {
+ for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
+ // If node started tx is not known as live then remove such tx from active list
+ ActiveTx activeTx = entry.getValue();
+
+ if (activeTx.getClass() == ActiveServerTx.class && !ids.contains(activeTx.nearNodeId))
+ forRmv.add(entry.getKey());
+ }
+ }
+
+ for (Long txCntr : forRmv)
+ // Committed counter is increased because it is not known if transaction was committed or not and we must
+ // bump committed counter for committed transaction as it is used in (read-only) query snapshot.
+ onTxDone(txCntr, true);
+ }
+
/** {@inheritDoc} */
@Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) {
prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
@@ -530,17 +573,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone())
return null;
else if (tx != null)
- return assignTxSnapshot(0L);
+ return assignTxSnapshot(0L, ctx.localNodeId(), false);
else
return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() {
- return requestSnapshotAsync((IgniteInternalTx)null);
- }
-
- /** {@inheritDoc} */
@Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) {
MvccSnapshotFuture fut = new MvccSnapshotFuture();
@@ -585,7 +623,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
});
}
else if (tx != null)
- lsnr.onResponse(assignTxSnapshot(0L));
+ lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
else
lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L));
@@ -741,9 +779,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
first = false;
}
- for (MvccSnapshotResponseListener lsnr : map.values()) {
+ for (MvccSnapshotResponseListener lsnr : map.values())
U.warn(log, ">>> " + lsnr.toString());
- }
}
first = true;
@@ -909,10 +946,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return activeQryTrackers;
}
- /**
- * @return Counter.
- */
- private MvccSnapshotResponse assignTxSnapshot(long futId) {
+ /** */
+ private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) {
assert initFut.isDone();
assert crdVer != 0;
assert ctx.localNodeId().equals(currentCoordinatorId());
@@ -926,14 +961,16 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
tracking = ver;
cleanup = committedCntr.get() + 1;
- for (Map.Entry<Long, Long> txVer : activeTxs.entrySet()) {
- cleanup = Math.min(txVer.getValue(), cleanup);
- tracking = Math.min(txVer.getKey(), tracking);
+ for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
+ cleanup = Math.min(entry.getValue().tracking, cleanup);
+ tracking = Math.min(entry.getKey(), tracking);
- res.addTx(txVer.getKey());
+ res.addTx(entry.getKey());
}
- boolean add = activeTxs.put(ver, tracking) == null;
+ ActiveTx activeTx = client ? new ActiveTx(tracking, nearId) : new ActiveServerTx(tracking, nearId);
+
+ boolean add = activeTxs.put(ver, activeTx) == null;
assert add : ver;
}
@@ -950,10 +987,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return res;
}
- /**
- * @param txCntr Counter assigned to transaction.
- */
- private void onTxDone(Long txCntr, boolean committed) {
+ /** */
+ private void onTxDone(Long txCntr, boolean increaseCommittedCntr) {
assert initFut.isDone();
GridFutureAdapter fut;
@@ -961,7 +996,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
synchronized (this) {
activeTxs.remove(txCntr);
- if (committed)
+ if (increaseCommittedCntr)
committedCntr.setIfGreater(txCntr);
}
@@ -1352,10 +1387,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return;
}
- MvccSnapshotResponse res = assignTxSnapshot(msg.futureId());
+ MvccSnapshotResponse res = assignTxSnapshot(msg.futureId(), nodeId, node.isClient());
+
+ boolean finishFailed = true;
try {
sendMessage(node.id(), res);
+
+ finishFailed = false;
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
@@ -1364,6 +1403,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx snapshot response [msg=" + msg + ", node=" + nodeId + ']', e);
}
+
+ if (finishFailed)
+ onTxDone(res.counter(), false);
}
/**
@@ -1390,9 +1432,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
-
onQueryDone(nodeId, res.tracking());
+
+ U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
}
}
@@ -1713,6 +1755,23 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
activeQueries.onNodeFailed(nodeId);
prevCrdQueries.onNodeFailed(nodeId);
+
+ recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
+ // Put synthetic vote from another failed node
+ ballotBox.vote(nodeId);
+
+ tryFinishRecoveryVoting(nearNodeId, ballotBox);
+ });
+
+ if (discoEvt.eventNode().isClient()) {
+ RecoveryBallotBox ballotBox = recoveryBallotBoxes
+ .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
+
+ ballotBox
+ .voters(discoEvt.topologyNodes().stream().map(ClusterNode::id).collect(Collectors.toList()));
+
+ tryFinishRecoveryVoting(nodeId, ballotBox);
+ }
}
/** {@inheritDoc} */
@@ -1767,6 +1826,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg);
else if (msg instanceof MvccActiveQueriesMessage)
processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
+ else if (msg instanceof MvccRecoveryFinishedMessage)
+ processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg));
else
U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
}
@@ -1777,6 +1838,82 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
+ /**
+ * Accumulates transaction recovery votes for a node left the cluster.
+ * Transactions started by the left node are considered not active
+ * when each cluster server node aknowledges that is has finished transactions for the left node.
+ */
+ private static class RecoveryBallotBox {
+ /** */
+ private List<UUID> voters;
+ /** */
+ private final Set<UUID> ballots = new HashSet<>();
+
+ /**
+ * @param voters Nodes which can have transaction started by the left node.
+ */
+ private synchronized void voters(List<UUID> voters) {
+ this.voters = voters;
+ }
+
+ /**
+ * @param nodeId Voting node id.
+ *
+ */
+ private synchronized void vote(UUID nodeId) {
+ ballots.add(nodeId);
+ }
+
+ /**
+ * @return {@code True} if all nodes expected to vote done it.
+ */
+ private synchronized boolean isVotingDone() {
+ if (voters == null)
+ return false;
+
+ return ballots.containsAll(voters);
+ }
+ }
+
+ /**
+ * Process message that one node has finished with transactions for the left node.
+ * @param nodeId Node sent the message.
+ * @param msg Message.
+ */
+ private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
+ UUID nearNodeId = msg.nearNodeId();
+
+ RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
+
+ ballotBox.vote(nodeId);
+
+ tryFinishRecoveryVoting(nearNodeId, ballotBox);
+ }
+
+ /**
+ * Finishes recovery on coordinator by removing transactions started by the left node
+ * @param nearNodeId Left node.
+ * @param ballotBox Votes accumulator for the left node.
+ */
+ private void tryFinishRecoveryVoting(UUID nearNodeId, RecoveryBallotBox ballotBox) {
+ if (ballotBox.isVotingDone()) {
+ List<Long> recoveredTxs;
+
+ synchronized (this) {
+ recoveredTxs = activeTxs.entrySet().stream()
+ .filter(e -> e.getValue().nearNodeId.equals(nearNodeId))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ // Committed counter is increased because it is not known if transaction was committed or not and we must
+ // bump committed counter for committed transaction as it is used in (read-only) query snapshot.
+ recoveredTxs.forEach(txCntr -> onTxDone(txCntr, true));
+
+ recoveryBallotBoxes.remove(nearNodeId);
+ }
+ }
+
/** */
private interface Waiter {
/**
@@ -2324,4 +2461,26 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
}
+
+ /** */
+ private static class ActiveTx {
+ /** */
+ private final long tracking;
+ /** */
+ private final UUID nearNodeId;
+
+ /** */
+ private ActiveTx(long tracking, UUID nearNodeId) {
+ this.tracking = tracking;
+ this.nearNodeId = nearNodeId;
+ }
+ }
+
+ /** */
+ private static class ActiveServerTx extends ActiveTx {
+ /** */
+ private ActiveServerTx(long tracking, UUID nearNodeId) {
+ super(tracking, nearNodeId);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 9441c17..972d4d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -242,7 +242,11 @@ public class MvccUtils {
if (mvccCntr > snapshotCntr) // we don't see future updates
return false;
- if (mvccCntr == snapshotCntr) {
+ // Basically we can make fast decision about visibility if found rows from the same transaction.
+ // But we can't make such decision for read-only queries,
+ // because read-only queries use last committed version in it's snapshot which could be actually aborted
+ // (during transaction recovery we do not know whether recovered transaction was committed or aborted).
+ if (mvccCntr == snapshotCntr && snapshotOpCntr != MVCC_READ_OP_CNTR) {
assert opCntr <= snapshotOpCntr : "rowVer=" + mvccVersion(mvccCrd, mvccCntr, opCntr) + ", snapshot=" + snapshot;
return opCntr < snapshotOpCntr; // we don't see own pending updates
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
new file mode 100644
index 0000000..a4ea103
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class MvccRecoveryFinishedMessage implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = -505062368078979867L;
+
+ /** */
+ private UUID nearNodeId;
+
+ /** */
+ public MvccRecoveryFinishedMessage() {
+ }
+
+ /** */
+ public MvccRecoveryFinishedMessage(UUID nearNodeId) {
+ this.nearNodeId = nearNodeId;
+ }
+
+ /**
+ * @return Left node id for which transactions were recovered.
+ */
+ public UUID nearNodeId() {
+ return nearNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ nearNodeId = reader.readUuid("nearNodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccRecoveryFinishedMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 164;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
new file mode 100644
index 0000000..ffd9a67
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class PartitionCountersNeighborcastRequest extends GridCacheIdMessage {
+ /** */
+ private static final long serialVersionUID = -1893577108462486998L;
+
+ /** */
+ @GridDirectCollection(PartitionUpdateCountersMessage.class)
+ private Collection<PartitionUpdateCountersMessage> updCntrs;
+
+ /** */
+ private IgniteUuid futId;
+
+ /** */
+ public PartitionCountersNeighborcastRequest() {
+ }
+
+ /** */
+ public PartitionCountersNeighborcastRequest(
+ Collection<PartitionUpdateCountersMessage> updCntrs, IgniteUuid futId) {
+ this.updCntrs = updCntrs;
+ this.futId = futId;
+ }
+
+ /**
+ * @return Partition update counters for remote node.
+ */
+ public Collection<PartitionUpdateCountersMessage> updateCounters() {
+ return updCntrs;
+ }
+
+ /**
+ * @return Sending future id.
+ */
+ public IgniteUuid futId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("updCntrs", updCntrs, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ updCntrs = reader.readCollection("updCntrs", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(PartitionCountersNeighborcastRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 165;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
new file mode 100644
index 0000000..547539d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class PartitionCountersNeighborcastResponse extends GridCacheIdMessage {
+ /** */
+ private static final long serialVersionUID = -8731050539139260521L;
+
+ /** */
+ private IgniteUuid futId;
+
+ /** */
+ public PartitionCountersNeighborcastResponse() {
+ }
+
+ /** */
+ public PartitionCountersNeighborcastResponse(IgniteUuid futId) {
+ this.futId = futId;
+ }
+
+ /**
+ * @return Sending future id.
+ */
+ public IgniteUuid futId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(PartitionCountersNeighborcastResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 166;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cb682f6..240fbbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1681,6 +1681,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public void finalizeUpdateCountres() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ if (delegate0 != null)
+ delegate0.finalizeUpdateCountres();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long nextUpdateCounter() {
try {
CacheDataStore delegate0 = init0(false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0d3ba75..399359b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -91,7 +91,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -283,7 +282,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
private volatile IgniteInternalFuture rollbackFut;
/** */
- private volatile TxCounters txCounters = new TxCounters();
+ private volatile TxCounters txCounters;
/**
* Empty constructor required for {@link Externalizable}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 895a9d1..75e2087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -47,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecove
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -257,6 +262,20 @@ public class IgniteTxHandler {
processCheckPreparedTxResponse(nodeId, res);
}
});
+
+ ctx.io().addCacheHandler(0, PartitionCountersNeighborcastRequest.class,
+ new CI2<UUID, PartitionCountersNeighborcastRequest>() {
+ @Override public void apply(UUID nodeId, PartitionCountersNeighborcastRequest req) {
+ processPartitionCountersRequest(nodeId, req);
+ }
+ });
+
+ ctx.io().addCacheHandler(0, PartitionCountersNeighborcastResponse.class,
+ new CI2<UUID, PartitionCountersNeighborcastResponse>() {
+ @Override public void apply(UUID nodeId, PartitionCountersNeighborcastResponse res) {
+ processPartitionCountersResponse(nodeId, res);
+ }
+ });
}
/**
@@ -2152,4 +2171,110 @@ public class IgniteTxHandler {
fut.onResult(nodeId, res);
}
+
+ /**
+ * @param nodeId Node id.
+ * @param req Request.
+ */
+ private void processPartitionCountersRequest(UUID nodeId, PartitionCountersNeighborcastRequest req) {
+ applyPartitionsUpdatesCounters(req.updateCounters());
+
+ try {
+ ctx.io().send(nodeId, new PartitionCountersNeighborcastResponse(req.futId()), SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (txRecoveryMsgLog.isDebugEnabled())
+ txRecoveryMsgLog.debug("Failed to send partition counters response, node left [node=" + nodeId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(txRecoveryMsgLog, "Failed to send partition counters response [node=" + nodeId + ']', e);
+ }
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param res Response.
+ */
+ private void processPartitionCountersResponse(UUID nodeId, PartitionCountersNeighborcastResponse res) {
+ PartitionCountersNeighborcastFuture fut = ((PartitionCountersNeighborcastFuture)ctx.mvcc().future(res.futId()));
+
+ if (fut == null) {
+ log.warning("Failed to find future for partition counters response [futId=" + res.futId() +
+ ", node=" + nodeId + ']');
+
+ return;
+ }
+
+ fut.onResult(nodeId);
+ }
+
+ /**
+ * Applies partition counter updates for mvcc transactions.
+ *
+ * @param counters Counter values to be updated.
+ */
+ public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) {
+ if (counters == null)
+ return;
+
+ int cacheId = CU.UNDEFINED_CACHE_ID;
+ GridDhtPartitionTopology top = null;
+
+ for (PartitionUpdateCountersMessage counter : counters) {
+ if (counter.cacheId() != cacheId) {
+ GridCacheContext ctx0 = ctx.cacheContext(cacheId = counter.cacheId());
+
+ assert ctx0.mvccEnabled();
+
+ top = ctx0.topology();
+ }
+
+ assert top != null;
+
+ for (int i = 0; i < counter.size(); i++) {
+ GridDhtLocalPartition part = top.localPartition(counter.partition(i));
+
+ assert part != null;
+
+ part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+ }
+ }
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param node Backup node.
+ * @return Partition counters for the given backup node.
+ */
+ @Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(
+ IgniteInternalTx tx, ClusterNode node) {
+ TxCounters txCntrs = tx.txCounters(false);
+
+ if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
+ return null;
+
+ Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
+
+ List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
+
+ AffinityTopologyVersion top = tx.topologyVersionSnapshot();
+
+ for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
+ GridCacheAffinityManager affinity = ctx.cacheContext(partCntrs.cacheId()).affinity();
+
+ PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
+
+ for (int i = 0; i < partCntrs.size(); i++) {
+ int part = partCntrs.partition(i);
+
+ if (affinity.backupByPartition(node, part, top))
+ resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
+ }
+
+ if (resCntrs.size() > 0)
+ res.add(resCntrs);
+ }
+
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 27b1522..0c2ca34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -36,12 +36,12 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture;
@@ -59,19 +59,18 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -106,6 +105,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -254,18 +254,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
};
- cctx.gridEvents().addLocalEventListener(
- new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
+ cctx.gridEvents().addDiscoveryEventListener(
+ new DiscoveryEventListener() {
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID nodeId = discoEvt.eventNode().id();
+ UUID nodeId = evt.eventNode().id();
// Wait some time in case there are some unprocessed messages from failed node.
- cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
+ cctx.time().addTimeoutObject(
+ new NodeFailureTimeoutObject(evt.eventNode(), discoCache.mvccCoordinator()));
if (txFinishSync != null)
txFinishSync.onNodeLeft(nodeId);
@@ -2026,7 +2024,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*/
public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
if (log.isInfoEnabled())
- log.info("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
+ log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']');
if (!tx.markFinalizing(RECOVERY_FINISH)) {
if (log.isInfoEnabled())
@@ -2046,10 +2044,28 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (commit)
tx.commitAsync().listen(new CommitListener(tx));
+ else if (tx.mvccSnapshot() != null && !tx.local())
+ // remote (backup) mvcc transaction sends partition counters to other backup transaction
+ // in order to keep counters consistent
+ neighborcastPartitionCountersAndRollback(tx);
else
tx.rollbackAsync();
}
+ /** */
+ private void neighborcastPartitionCountersAndRollback(IgniteInternalTx tx) {
+ TxCounters txCounters = tx.txCounters(false);
+
+ if (txCounters == null || txCounters.updateCounters() == null)
+ tx.rollbackAsync();
+
+ PartitionCountersNeighborcastFuture fut = new PartitionCountersNeighborcastFuture(tx, cctx);
+
+ fut.listen(fut0 -> tx.rollbackAsync());
+
+ fut.init();
+ }
+
/**
* Commits transaction in case when node started transaction failed, but all related
* transactions were prepared (invalidates transaction if it is not fully prepared).
@@ -2427,16 +2443,20 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
- /** Left or failed node. */
- private final UUID evtNodeId;
+ /** */
+ private final ClusterNode node;
+ /** */
+ private final MvccCoordinator mvccCrd;
/**
- * @param evtNodeId Event node ID.
+ * @param node Failed node.
+ * @param mvccCrd Mvcc coordinator at time of node failure.
*/
- private NodeFailureTimeoutObject(UUID evtNodeId) {
+ private NodeFailureTimeoutObject(ClusterNode node, MvccCoordinator mvccCrd) {
super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT);
- this.evtNodeId = evtNodeId;
+ this.node = node;
+ this.mvccCrd = mvccCrd;
}
/**
@@ -2453,11 +2473,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return;
}
+ UUID evtNodeId = node.id();
+
try {
if (log.isDebugEnabled())
log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() +
", failedNodeId=" + evtNodeId + ']');
+ // Null means that recovery voting is not needed.
+ GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = node.isClient() && mvccCrd != null
+ ? new GridCompoundFuture<>() : null;
+
for (final IgniteInternalTx tx : activeTransactions()) {
if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
// Invalidate transactions.
@@ -2472,24 +2498,57 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null) {
- prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- if (tx.state() == PREPARED)
- commitIfPrepared(tx, Collections.singleton(evtNodeId));
- else if (tx.setRollbackOnly())
- tx.rollbackAsync();
- }
+ prepFut.listen(fut -> {
+ if (tx.state() == PREPARED)
+ commitIfPrepared(tx, Collections.singleton(evtNodeId));
+ // If we could not mark tx as rollback, it means that transaction is being committed.
+ else if (tx.setRollbackOnly())
+ tx.rollbackAsync();
});
}
- else {
- // If we could not mark tx as rollback, it means that transaction is being committed.
- if (tx.setRollbackOnly())
- tx.rollbackAsync();
- }
+ // If we could not mark tx as rollback, it means that transaction is being committed.
+ else if (tx.setRollbackOnly())
+ tx.rollbackAsync();
}
}
+
+ // Await only mvcc transactions initiated by failed client node.
+ if (allTxFinFut != null && tx.eventNodeId().equals(evtNodeId)
+ && tx.mvccSnapshot() != null)
+ allTxFinFut.add(tx.finishFuture());
}
}
+
+ if (allTxFinFut == null)
+ return;
+
+ allTxFinFut.markInitialized();
+
+ // Send vote to mvcc coordinator when all recovering transactions have finished.
+ allTxFinFut.listen(fut -> {
+ // If mvcc coordinator issued snapshot for recovering transaction has failed during recovery,
+ // then there is no need to send messages to new coordinator.
+ try {
+ cctx.kernalContext().io().sendToGridTopic(
+ mvccCrd.nodeId(),
+ TOPIC_CACHE_COORDINATOR,
+ new MvccRecoveryFinishedMessage(evtNodeId),
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isInfoEnabled())
+ log.info("Mvcc coordinator issued snapshots for recovering transactions " +
+ "has left the cluster (will ignore) [locNodeId=" + cctx.localNodeId() +
+ ", failedNodeId=" + evtNodeId +
+ ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']');
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to notify mvcc coordinator that all recovering transactions were " +
+ "finished [locNodeId=" + cctx.localNodeId() +
+ ", failedNodeId=" + evtNodeId +
+ ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']', e);
+ }
+ });
}
finally {
cctx.kernalContext().gateway().readUnlock();