You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/31 14:48:02 UTC
[1/2] ignite git commit: ignite-3478
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 855c2d457 -> 08be7310a
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
new file mode 100644
index 0000000..40354a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ *
+ */
+class CoordinatorAssignmentHistory {
+ /** */
+ private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = Collections.emptyMap();
+
+ /** */
+ private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode>
+ cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null);
+
+ void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) {
+ assert !assignHist.containsKey(topVer);
+ assert topVer.compareTo(cur.get1()) > 0;
+
+ cur = new IgniteBiTuple<>(topVer, crd);
+
+ Map<AffinityTopologyVersion, ClusterNode> hist = new HashMap<>(assignHist);
+
+ hist.put(topVer, crd);
+
+ assignHist = hist;
+
+ }
+
+ ClusterNode currentCoordinator() {
+ return cur.get2();
+ }
+
+ ClusterNode coordinator(AffinityTopologyVersion topVer) {
+ assert topVer.initialized() : topVer;
+
+ IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur;
+
+ if (cur0.get1().equals(topVer))
+ return cur0.get2();
+
+ Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist;
+
+ assert assignHist.containsKey(topVer) :
+ "No coordinator assignment [topVer=" + topVer + ", curVer=" + cur0.get1() + ", hist=" + assignHist0.keySet() + ']';
+
+ return assignHist0.get(topVer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7598003..dfe0e06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -634,4 +634,9 @@ public interface IgniteInternalTx {
* @param e Commit error.
*/
public void commitError(Throwable e);
+
+ /**
+ * @param mvccCrdCntr Update counter assigned by MVCC coordinator.
+ */
+ public void mvccCoordinatorCounter(long mvccCrdCntr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4d85db5..8ad717a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -245,6 +246,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** Store used flag. */
protected boolean storeEnabled = true;
+ /** */
+ private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -1525,6 +1529,33 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash));
}
+ /** {@inheritDoc} */
+ public final void mvccCoordinatorCounter(long mvccCrdCntr) {
+ this.mvccCrdCntr = mvccCrdCntr;
+ }
+
+ /**
+ * @return Coordinator counter.
+ */
+ public final long mvccCoordinatorCounter() {
+ return mvccCrdCntr;
+ }
+
+ /**
+ * @return Mvcc version.
+ */
+ protected final TxMvccVersion createMvccVersion() {
+ assert !txState().mvccEnabled(cctx) || mvccCrdCntr != TxMvccVersion.COUNTER_NA : mvccCrdCntr;
+
+ if (mvccCrdCntr != TxMvccVersion.COUNTER_NA) {
+ return new TxMvccVersion(topologyVersion().topologyVersion(),
+ mvccCrdCntr,
+ nearXidVersion());
+ }
+
+ return null;
+ }
+
/**
* Resolve DR conflict.
*
@@ -1824,6 +1855,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ @Override public void mvccCoordinatorCounter(long mvccCrdCntr) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public boolean localResult() {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index beeb184..cac1069 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -254,11 +254,7 @@ public class IgniteTxHandler {
) {
req.txState(locTx.txState());
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
- req.reads(),
- req.writes(),
- req.transactionNodes(),
- req.last());
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
if (locTx.isRollbackOnly())
locTx.rollbackNearTxLocalAsync();
@@ -520,14 +516,7 @@ public class IgniteTxHandler {
if (req.needReturnValue())
tx.needReturnValue(true);
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
- req.reads(),
- req.writes(),
- req.dhtVersions(),
- req.messageId(),
- req.miniId(),
- req.transactionNodes(),
- req.last());
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req);
if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
@@ -1322,6 +1311,7 @@ public class IgniteTxHandler {
tx.commitVersion(req.commitVersion());
tx.invalidate(req.isInvalidate());
tx.systemInvalidate(req.isSystemInvalidate());
+ tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
// Complete remote candidates.
tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1368,6 +1358,7 @@ public class IgniteTxHandler {
try {
tx.commitVersion(req.writeVersion());
tx.invalidate(req.isInvalidate());
+ tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
// Complete remote candidates.
tx.doneRemote(req.version(), null, null, null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 10b06d8..5efe225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -289,6 +289,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+ GridCacheContext ctx0 = cacheCtx;
+
+ return ctx0 != null && ctx0.mvccEnabled();
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index e7ebaae..836eecc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -507,6 +508,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
try {
cctx.tm().txContext(this);
+ TxMvccVersion mvccVer = createMvccVersion();
+
AffinityTopologyVersion topVer = topologyVersion();
/*
@@ -684,7 +687,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null);
+ null,
+ mvccVer);
if (updRes.success())
txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -711,7 +715,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null);
+ null,
+ mvccVer);
}
}
else if (op == DELETE) {
@@ -732,7 +737,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null);
+ null,
+ mvccVer);
if (updRes.success())
txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -755,7 +761,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null);
+ null,
+ mvccVer);
}
}
else if (op == RELOAD) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index b61a99c..2fe63fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -142,4 +142,9 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+ return entry != null ? entry.context().mvccEnabled() : false;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
index 1326491..1b6c656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
@@ -209,4 +209,14 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+ for (IgniteTxEntry e : writeMap.values()) {
+ if (e.context().mvccEnabled())
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 1fe0d2a..29cd728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -180,4 +180,10 @@ public interface IgniteTxState {
* @return {@code True} if transaction is empty.
*/
public boolean empty();
+
+ /**
+ * @param cctx Context.
+ * @return {@code True} if MVCC mode is enabled for transaction.
+ */
+ public boolean mvccEnabled(GridCacheSharedContext cctx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 4f14b5c..ea0cde4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -462,6 +462,15 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+ assert !activeCacheIds.isEmpty();
+
+ int cacheId = activeCacheIds.get(0);
+
+ return cctx.cacheContext(cacheId).mvccEnabled();
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return S.toString(IgniteTxStateImpl.class, this, "txMap", allEntriesCopy());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6712b5b..f0e19c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -456,7 +457,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr)
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
+ )
throws IgniteCheckedException, GridCacheEntryRemovedException {
return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
}
@@ -536,7 +539,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
) throws IgniteCheckedException, GridCacheEntryRemovedException {
obsoleteVer = ver;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index ee6cfd0..7920e0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -204,7 +204,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
CacheMode cacheMode,
CacheWriteSynchronizationMode syncMode,
int backups) {
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
[2/2] ignite git commit: ignite-3478
Posted by sb...@apache.org.
ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08be7310
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08be7310
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08be7310
Branch: refs/heads/ignite-3478
Commit: 08be7310a93d3ce455215b97cf8ab1a2c3f0ab31
Parents: 855c2d4
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 31 12:52:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 31 16:59:15 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 1 -
.../discovery/GridDiscoveryManager.java | 3 -
.../processors/cache/GridCacheEntryEx.java | 7 +-
.../processors/cache/GridCacheMapEntry.java | 7 +-
.../GridDistributedTxRemoteAdapter.java | 12 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 49 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 47 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 487 ++++++++++---------
.../dht/GridDhtTxPrepareRequest.java | 63 ++-
.../GridDhtPartitionsExchangeFuture.java | 2 +
.../GridNearPessimisticTxPrepareFuture.java | 37 ++
.../near/GridNearTxFinishFuture.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 20 +-
.../near/GridNearTxPrepareRequest.java | 17 +
.../near/GridNearTxPrepareResponse.java | 18 +
.../mvcc/CacheCoordinatorsSharedManager.java | 77 ++-
.../mvcc/CoordinatorAssignmentHistory.java | 71 +++
.../cache/transactions/IgniteInternalTx.java | 5 +
.../cache/transactions/IgniteTxAdapter.java | 36 ++
.../cache/transactions/IgniteTxHandler.java | 17 +-
.../IgniteTxImplicitSingleStateImpl.java | 7 +
.../transactions/IgniteTxLocalAdapter.java | 15 +-
.../IgniteTxRemoteSingleStateImpl.java | 5 +
.../transactions/IgniteTxRemoteStateImpl.java | 10 +
.../cache/transactions/IgniteTxState.java | 6 +
.../cache/transactions/IgniteTxStateImpl.java | 9 +
.../processors/cache/GridCacheTestEntryEx.java | 8 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 2 +-
29 files changed, 714 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4b57eb8..95e855a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.managers.discovery;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8e3f9fc..cbd2738 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,7 +30,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -43,9 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.zip.CRC32;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index b2cabac..5b97195 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -381,7 +382,8 @@ public interface GridCacheEntryEx {
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -423,7 +425,8 @@ public interface GridCacheEntryEx {
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 61f6fb4..5336b22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
@@ -888,7 +889,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
) throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject old;
@@ -1082,7 +1084,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable TxMvccVersion mvccVer
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.transactional();
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index ea6461d..db1e2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -474,6 +475,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cctx.database().checkpointReadLock();
try {
+ TxMvccVersion mvccVer = createMvccVersion();
+
Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries();
List<DataEntry> dataEntries = null;
@@ -594,7 +597,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter());
+ txEntry.updateCounter(),
+ mvccVer);
else {
assert val != null : txEntry;
@@ -618,7 +622,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter());
+ txEntry.updateCounter(),
+ mvccVer);
// Keep near entry up to date.
if (nearCached != null) {
@@ -650,7 +655,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter());
+ txEntry.updateCounter(),
+ mvccVer);
// Keep near entry up to date.
if (nearCached != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5311ddc..9ca1412 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -347,7 +348,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
false,
- false);
+ false,
+ TxMvccVersion.COUNTER_NA);
try {
cctx.io().send(n, req, tx.ioPolicy());
@@ -395,6 +397,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (tx.onePhaseCommit())
return false;
+ assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA;
+
boolean sync = tx.syncMode() == FULL_SYNC;
if (tx.explicitLock())
@@ -450,7 +454,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
tx.activeCachesDeploymentEnabled(),
updCntrs,
false,
- false);
+ false,
+ tx.mvccCoordinatorCounter());
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
@@ -519,7 +524,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
false,
- false);
+ false,
+ tx.mvccCoordinatorCounter());
req.writeVersion(tx.writeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 90f3687..976a534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -66,6 +67,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** One phase commit write version. */
private GridCacheVersion writeVer;
+ /** */
+ private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -121,7 +125,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
int taskNameHash,
boolean addDepInfo,
boolean retVal,
- boolean waitRemoteTxs
+ boolean waitRemoteTxs,
+ long mvccCrdCntr
) {
super(
xidVer,
@@ -150,6 +155,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
this.nearNodeId = nearNodeId;
this.isolation = isolation;
this.miniId = miniId;
+ this.mvccCrdCntr = mvccCrdCntr;
needReturnValue(retVal);
waitRemoteTransactions(waitRemoteTxs);
@@ -206,7 +212,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean addDepInfo,
Collection<Long> updateIdxs,
boolean retVal,
- boolean waitRemoteTxs
+ boolean waitRemoteTxs,
+ long mvccCrdCntr
) {
this(nearNodeId,
futId,
@@ -231,7 +238,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
taskNameHash,
addDepInfo,
retVal,
- waitRemoteTxs);
+ waitRemoteTxs,
+ mvccCrdCntr);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -242,6 +250,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
+ * @return Counter.
+ */
+ public long mvccCoordinatorCounter() {
+ return mvccCrdCntr;
+ }
+
+ /**
* @return Partition update counters.
*/
public GridLongList partUpdateCounters(){
@@ -367,24 +382,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
writer.incrementState();
case 23:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
return false;
writer.incrementState();
case 24:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 25:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 26:
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 27:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -427,7 +448,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- nearNodeId = reader.readUuid("nearNodeId");
+ mvccCrdCntr = reader.readLong("mvccCrdCntr");
if (!reader.isLastRead())
return false;
@@ -435,7 +456,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- partUpdateCnt = reader.readMessage("partUpdateCnt");
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -443,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -451,6 +472,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 27:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -470,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 27;
+ return 28;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 5b8a7b5..44e2a54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -313,24 +314,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/**
* Prepares next batch of entries in dht transaction.
*
- * @param reads Read entries.
- * @param writes Write entries.
- * @param verMap Version map.
- * @param msgId Message ID.
- * @param nearMiniId Near mini future ID.
- * @param txNodes Transaction nodes mapping.
- * @param last {@code True} if this is last prepare request.
+ * @param req Prepare request.
* @return Future that will be completed when locks are acquired.
*/
- public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
- @Nullable Collection<IgniteTxEntry> reads,
- @Nullable Collection<IgniteTxEntry> writes,
- Map<IgniteTxKey, GridCacheVersion> verMap,
- long msgId,
- int nearMiniId,
- Map<UUID, Collection<UUID>> txNodes,
- boolean last
- ) {
+ public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNearTxPrepareRequest req) {
// In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture fut = prepFut;
@@ -344,14 +331,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
cctx,
this,
timeout,
- nearMiniId,
- verMap,
- last,
+ req.miniId(),
+ req.dhtVersions(),
+ req.last(),
needReturnValue()))) {
GridDhtTxPrepareFuture f = prepFut;
- assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
- "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
+ assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+ "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + f + ']';
if (timeout == -1)
f.onError(timeoutException());
@@ -360,8 +347,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else {
- assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
- "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
+ assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+ "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + fut + ']';
// Prepare was called explicitly.
return chainOnePhasePrepare(fut);
@@ -389,14 +376,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
try {
- if (reads != null) {
- for (IgniteTxEntry e : reads)
- addEntry(msgId, e);
+ if (req.reads() != null) {
+ for (IgniteTxEntry e : req.reads())
+ addEntry(req.messageId(), e);
}
- if (writes != null) {
- for (IgniteTxEntry e : writes)
- addEntry(msgId, e);
+ if (req.writes() != null) {
+ for (IgniteTxEntry e : req.writes())
+ addEntry(req.messageId(), e);
}
userPrepare(null);
@@ -407,7 +394,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (isSystemInvalidate())
fut.complete();
else
- fut.prepare(reads, writes, txNodes);
+ fut.prepare(req);
}
catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
fut.onError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 03d99fc..a3d67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -84,6 +86,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -168,14 +171,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
@SuppressWarnings("UnusedDeclaration")
private volatile int mapped;
- /** Prepare reads. */
- private Iterable<IgniteTxEntry> reads;
-
- /** Prepare writes. */
- private Iterable<IgniteTxEntry> writes;
-
- /** Tx nodes. */
- private Map<UUID, Collection<UUID>> txNodes;
+ /** Prepare request. */
+ private GridNearTxPrepareRequest req;
/** Trackable flag. */
private boolean trackable = true;
@@ -341,7 +338,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
private void onEntriesLocked() {
ret = new GridCacheReturn(null, tx.localResult(), true, null, true);
- for (IgniteTxEntry writeEntry : writes) {
+ for (IgniteTxEntry writeEntry : req.writes()) {
IgniteTxEntry txEntry = tx.entry(writeEntry.txKey());
assert txEntry != null : writeEntry;
@@ -597,10 +594,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
if (log.isDebugEnabled())
log.debug("Marking all local candidates as ready: " + this);
- readyLocks(writes);
+ readyLocks(req.writes());
if (tx.serializable() && tx.optimistic())
- readyLocks(reads);
+ readyLocks(req.reads());
locksReady = true;
}
@@ -869,6 +866,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
tx.onePhaseCommit(),
tx.activeCachesDeploymentEnabled());
+ res.mvccCoordinatorCounter(tx.mvccCoordinatorCounter());
+
if (prepErr == null) {
if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor())
addDhtValues(res);
@@ -896,8 +895,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
*/
private void addDhtValues(GridNearTxPrepareResponse res) {
// Interceptor on near node needs old values to execute callbacks.
- if (!F.isEmpty(writes)) {
- for (IgniteTxEntry e : writes) {
+ if (!F.isEmpty(req.writes())) {
+ for (IgniteTxEntry e : req.writes()) {
IgniteTxEntry txEntry = tx.entry(e.txKey());
assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
@@ -1002,33 +1001,30 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
/**
* Initializes future.
*
- * @param reads Read entries.
- * @param writes Write entries.
- * @param txNodes Transaction nodes mapping.
+ * @param req Prepare request.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
- Map<UUID, Collection<UUID>> txNodes) {
+ public void prepare(GridNearTxPrepareRequest req) {
+ assert req != null;
+
if (tx.empty()) {
tx.setRollbackOnly();
onDone((GridNearTxPrepareResponse)null);
}
- this.reads = reads;
- this.writes = writes;
- this.txNodes = txNodes;
+ this.req = req;
boolean ser = tx.serializable() && tx.optimistic();
- if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
+ if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
- for (IgniteTxEntry entry : writes)
+ for (IgniteTxEntry entry : req.writes())
forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
if (ser) {
- for (IgniteTxEntry entry : reads)
+ for (IgniteTxEntry entry : req.reads())
forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
}
@@ -1191,15 +1187,17 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
*
*/
private void prepare0() {
+ boolean skipInit = false;
+
try {
if (tx.serializable() && tx.optimistic()) {
IgniteCheckedException err0;
try {
- err0 = checkReadConflict(writes);
+ err0 = checkReadConflict(req.writes());
if (err0 == null)
- err0 = checkReadConflict(reads);
+ err0 = checkReadConflict(req.reads());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to check entry version: " + e, e);
@@ -1225,264 +1223,317 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
}
+ IgniteInternalFuture<Long> waitCoordCntrFut = null;
+
+ if (req.requestMvccCounter()) {
+ assert tx.txState().mvccEnabled(cctx);
+
+ ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
+
+ assert crd != null : tx.topologyVersion();
+
+ if (crd.isLocal())
+ tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion()));
+ else {
+ IgniteInternalFuture<Long> coordCntrFut = cctx.coordinators().requestTxCounter(crd, tx);
+
+ if (tx.onePhaseCommit())
+ waitCoordCntrFut = coordCntrFut;
+ }
+ }
+
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
// We are holding transaction-level locks for entries here, so we can get next write version.
tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
- {
- // Assign keys to primary nodes.
- if (!F.isEmpty(writes)) {
- for (IgniteTxEntry write : writes)
- map(tx.entry(write.txKey()));
- }
+ // Assign keys to primary nodes.
+ if (!F.isEmpty(req.writes())) {
+ for (IgniteTxEntry write : req.writes())
+ map(tx.entry(write.txKey()));
+ }
- if (!F.isEmpty(reads)) {
- for (IgniteTxEntry read : reads)
- map(tx.entry(read.txKey()));
- }
+ if (!F.isEmpty(req.reads())) {
+ for (IgniteTxEntry read : req.reads())
+ map(tx.entry(read.txKey()));
}
if (isDone())
return;
if (last) {
- if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
- for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
- tx.onePhaseCommit(false);
+ if (waitCoordCntrFut != null) {
+ skipInit = true;
- break;
- }
- }
- }
-
- int miniId = 0;
+ waitCoordCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
+ @Override public void apply(IgniteInternalFuture<Long> fut) {
+ try {
+ fut.get();
- assert tx.transactionNodes() != null;
+ sendPrepareRequests();
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to get coordinator counter: " + e, e);
- final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+ GridNearTxPrepareResponse res = createPrepareResponse(e);
- // Create mini futures.
- for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
- assert !dhtMapping.empty();
+ onDone(res, res.error());
+ }
+ finally {
+ markInitialized();
+ }
+ }
+ });
+ }
+ else
+ sendPrepareRequests();
+ }
+ }
+ finally {
+ if (!skipInit)
+ markInitialized();
+ }
+ }
- ClusterNode n = dhtMapping.primary();
+ /**
+ *
+ */
+ private void sendPrepareRequests() {
+ if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+ for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+ tx.onePhaseCommit(false);
- assert !n.isLocal();
+ break;
+ }
+ }
+ }
- GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
+ assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA;
- Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
+ int miniId = 0;
- Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
+ assert tx.transactionNodes() != null;
- if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
- continue;
+ final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
- if (tx.remainingTime() == -1)
- return;
+ // Create mini futures.
+ for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
+ assert !dhtMapping.empty();
- MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+ ClusterNode n = dhtMapping.primary();
- add(fut); // Append new future.
+ assert !n.isLocal();
- assert txNodes != null;
+ GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
- GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut.futureId(),
- tx.topologyVersion(),
- tx,
- timeout,
- dhtWrites,
- nearWrites,
- txNodes,
- tx.nearXidVersion(),
- true,
- tx.onePhaseCommit(),
- tx.subjectId(),
- tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled(),
- tx.storeWriteThrough(),
- retVal);
+ Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
- int idx = 0;
+ Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
- for (IgniteTxEntry entry : dhtWrites) {
- try {
- GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+ if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+ continue;
- GridCacheContext<?, ?> cacheCtx = cached.context();
+ if (tx.remainingTime() == -1)
+ return;
- // Do not invalidate near entry on originating transaction node.
- req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
- cached.readerId(n.id()) != null);
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+ add(fut); // Append new future.
+
+ assert req.transactionNodes() != null;
+
+ GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+ futId,
+ fut.futureId(),
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ dhtWrites,
+ nearWrites,
+ this.req.transactionNodes(),
+ tx.nearXidVersion(),
+ true,
+ tx.onePhaseCommit(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ tx.activeCachesDeploymentEnabled(),
+ tx.storeWriteThrough(),
+ retVal,
+ tx.mvccCoordinatorCounter());
+
+ int idx = 0;
+
+ for (IgniteTxEntry entry : dhtWrites) {
+ try {
+ GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- if (cached.isNewLocked()) {
- List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
- tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
+ GridCacheContext<?, ?> cacheCtx = cached.context();
- // Do not preload if local node is a partition owner.
- if (!owners.contains(cctx.localNode()))
- req.markKeyForPreload(idx);
- }
+ // Do not invalidate near entry on originating transaction node.
+ req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
+ cached.readerId(n.id()) != null);
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
+ if (cached.isNewLocked()) {
+ List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
+ tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
- idx++;
+ // Do not preload if local node is a partition owner.
+ if (!owners.contains(cctx.localNode()))
+ req.markKeyForPreload(idx);
}
- if (!F.isEmpty(nearWrites)) {
- for (IgniteTxEntry entry : nearWrites) {
- try {
- if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate added = entry.cached().candidate(version());
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
+ }
- assert added != null : "Missing candidate for cache entry:" + entry;
- assert added.dhtLocal();
+ idx++;
+ }
- if (added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
- }
+ if (!F.isEmpty(nearWrites)) {
+ for (IgniteTxEntry entry : nearWrites) {
+ try {
+ if (entry.explicitVersion() == null) {
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
+ assert added != null : "Missing candidate for cache entry:" + entry;
+ assert added.dhtLocal();
+
+ if (added.ownerVersion() != null)
+ req.owned(entry.txKey(), added.ownerVersion());
}
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
}
+ }
+ }
- assert req.transactionNodes() != null;
+ assert req.transactionNodes() != null;
- try {
- cctx.io().send(n, req, tx.ioPolicy());
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ if (!cctx.kernalContext().isStopping()) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() + ']');
}
- catch (IgniteCheckedException e) {
- if (!cctx.kernalContext().isStopping()) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() + ']');
- }
- fut.onResult(e);
- }
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() +
- ", err=" + e + ']');
- }
- }
+ fut.onResult(e);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() +
+ ", err=" + e + ']');
}
}
+ }
+ }
- for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
- if (tx.remainingTime() == -1)
- return;
-
- MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
-
- add(fut); // Append new future.
-
- GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut.futureId(),
- tx.topologyVersion(),
- tx,
- timeout,
- null,
- nearMapping.writes(),
- tx.transactionNodes(),
- tx.nearXidVersion(),
- true,
- tx.onePhaseCommit(),
- tx.subjectId(),
- tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled(),
- tx.storeWriteThrough(),
- retVal);
-
- for (IgniteTxEntry entry : nearMapping.entries()) {
- if (CU.writes().apply(entry)) {
- try {
- if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate added = entry.cached().candidate(version());
+ for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+ if (tx.remainingTime() == -1)
+ return;
- assert added != null : "Null candidate for non-group-lock entry " +
- "[added=" + added + ", entry=" + entry + ']';
- assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
- "[added=" + added + ", entry=" + entry + ']';
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
+
+ add(fut); // Append new future.
+
+ GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+ futId,
+ fut.futureId(),
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ null,
+ nearMapping.writes(),
+ tx.transactionNodes(),
+ tx.nearXidVersion(),
+ true,
+ tx.onePhaseCommit(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ tx.activeCachesDeploymentEnabled(),
+ tx.storeWriteThrough(),
+ retVal,
+ tx.mvccCoordinatorCounter());
+
+ for (IgniteTxEntry entry : nearMapping.entries()) {
+ if (CU.writes().apply(entry)) {
+ try {
+ if (entry.explicitVersion() == null) {
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- if (added != null && added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
- }
+ assert added != null : "Null candidate for non-group-lock entry " +
+ "[added=" + added + ", entry=" + entry + ']';
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ "[added=" + added + ", entry=" + entry + ']';
- break;
- } catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
+ if (added != null && added.ownerVersion() != null)
+ req.owned(entry.txKey(), added.ownerVersion());
}
+
+ break;
+ } catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
}
+ }
+ }
- assert req.transactionNodes() != null;
+ assert req.transactionNodes() != null;
- try {
- cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
+ try {
+ cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ if (!cctx.kernalContext().isStopping()) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() + ']');
}
- catch (IgniteCheckedException e) {
- if (!cctx.kernalContext().isStopping()) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() + ']');
- }
- fut.onResult(e);
- }
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() +
- ", err=" + e + ']');
- }
- }
+ fut.onResult(e);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() +
+ ", err=" + e + ']');
}
}
}
}
}
- finally {
- markInitialized();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index d334850..805c34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -103,6 +104,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
@GridDirectTransient
private List<IgniteTxKey> nearWritesCacheMissed;
+ /** */
+ private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -141,7 +145,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
int taskNameHash,
boolean addDepInfo,
boolean storeWriteThrough,
- boolean retVal) {
+ boolean retVal,
+ long mvccCrdCntr) {
super(tx,
timeout,
null,
@@ -169,6 +174,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
nearNodeId = tx.nearNodeId();
+ this.mvccCrdCntr = mvccCrdCntr;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long mvccCoordinatorCounter() {
+ return mvccCrdCntr;
}
/**
@@ -407,54 +420,60 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 23:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
return false;
writer.incrementState();
case 24:
- if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 25:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
+ if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 26:
- if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
case 27:
- if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 28:
- if (!writer.writeBitSet("preloadKeys", preloadKeys))
+ if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 29:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
case 30:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 31:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 32:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -501,7 +520,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 23:
- nearNodeId = reader.readUuid("nearNodeId");
+ mvccCrdCntr = reader.readLong("mvccCrdCntr");
if (!reader.isLastRead())
return false;
@@ -509,7 +528,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 24:
- nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -517,7 +536,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 25:
- nearXidVer = reader.readMessage("nearXidVer");
+ nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -525,7 +544,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 26:
- ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+ nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
return false;
@@ -533,7 +552,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 27:
- ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+ ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -541,7 +560,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 28:
- preloadKeys = reader.readBitSet("preloadKeys");
+ ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -549,7 +568,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- subjId = reader.readUuid("subjId");
+ preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
return false;
@@ -557,7 +576,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -565,6 +584,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 32:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -584,6 +611,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 32;
+ return 33;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6d85222..82bd463 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1418,6 +1418,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (err == null) {
+ cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache());
+
if (centralizedAff) {
assert !exchCtx.mergeExchanges();
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index d017d7d..0cccce3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -262,6 +263,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
AffinityTopologyVersion topVer = tx.topologyVersion();
+ ClusterNode mvccCrd = null;
+
+ if (tx.txState().mvccEnabled(cctx)) {
+ mvccCrd = cctx.coordinators().coordinator(topVer);
+
+ if (mvccCrd == null) {
+ onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+ return;
+ }
+ }
+
GridDhtTxMapping txMapping = new GridDhtTxMapping();
boolean hasNearCache = false;
@@ -326,6 +339,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
for (final GridDistributedTxMapping m : mappings.values()) {
final ClusterNode primary = m.primary();
+ boolean needCntr = false;
+
+ if (mvccCrd != null) {
+ if (tx.onePhaseCommit() || mvccCrd.equals(primary)) {
+ needCntr = true;
+
+ mvccCrd = null;
+ }
+ }
+
if (primary.isLocal()) {
if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
@@ -334,6 +357,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
m.nearEntriesReads(),
m.nearEntriesWrites());
+ nearReq.requestMvccCounter(needCntr);
+
prepareLocal(nearReq, m, ++miniId, true);
GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
@@ -347,6 +372,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
else {
GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
+ req.requestMvccCounter(needCntr);
+
prepareLocal(req, m, ++miniId, m.hasNearCacheEntries());
}
}
@@ -357,6 +384,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
m.reads(),
m.writes());
+ req.requestMvccCounter(needCntr);
+
final MiniFuture fut = new MiniFuture(m, ++miniId);
req.miniId(fut.futureId());
@@ -389,6 +418,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
}
+ if (mvccCrd != null) {
+ assert !tx.onePhaseCommit();
+
+ IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx);
+
+ add((IgniteInternalFuture)cntrFut);
+ }
+
markInitialized();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c45eb7b..e093eeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -847,7 +848,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
0,
tx.activeCachesDeploymentEnabled(),
!waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
- waitRemoteTxs);
+ waitRemoteTxs,
+ TxMvccVersion.COUNTER_NA);
finishReq.checkCommitted(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 55d6bdd..8ecf21f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
-import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
-import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
@@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
/**
* Prepares next batch of entries in dht transaction.
*
- * @param reads Read entries.
- * @param writes Write entries.
- * @param txNodes Transaction nodes mapping.
- * @param last {@code True} if this is last prepare request.
+ * @param req Prepare request.
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
- @Nullable Collection<IgniteTxEntry> reads,
- @Nullable Collection<IgniteTxEntry> writes,
- Map<UUID, Collection<UUID>> txNodes,
- boolean last
- ) {
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(GridNearTxPrepareRequest req) {
long timeout = remainingTime();
if (state() != PREPARING) {
@@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
timeout,
0,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
- last,
+ req.last(),
needReturnValue() && implicit());
try {
- userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes);
+ userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(), req.reads()) : req.writes());
// Make sure to add future before calling prepare on it.
cctx.mvcc().addFuture(fut);
@@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
if (isSystemInvalidate())
fut.complete();
else
- fut.prepare(reads, writes, txNodes);
+ fut.prepare(req);
}
catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
fut.onError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e352c87..e1c6636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -57,6 +57,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
+ /** */
+ private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02;
+
/** Future ID. */
private IgniteUuid futId;
@@ -149,6 +152,20 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
+ * @return {@code True} if need request MVCC counter on primary node on prepare step.
+ */
+ public boolean requestMvccCounter() {
+ return isFlag(REQUEST_MVCC_CNTR_FLAG_MASK);
+ }
+
+ /**
+ * @param val {@code True} if need request MVCC counter on primary node on prepare step.
+ */
+ public void requestMvccCounter(boolean val) {
+ setFlag(val, REQUEST_MVCC_CNTR_FLAG_MASK);
+ }
+
+ /**
* @return {@code True} if it is safe for first client request to wait for topology future
* completion.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 8162168..4233371 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -97,6 +98,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** Not {@code null} if client node should remap transaction. */
private AffinityTopologyVersion clientRemapVer;
+ /** */
+ private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -146,6 +150,20 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @param mvccCrdCntr Counter.
+ */
+ public void mvccCoordinatorCounter(long mvccCrdCntr) {
+ this.mvccCrdCntr = mvccCrdCntr;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long mvccCoordinatorCounter() {
+ return mvccCrdCntr;
+ }
+
+ /**
* @return One-phase commit state on primary node.
*/
public boolean onePhaseCommit() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index e5d07ea..ec29002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.mvcc;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -31,9 +30,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -47,6 +49,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
*/
public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
/** */
+ private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory();
+
+ /** */
private final AtomicLong mvccCntr = new AtomicLong(0L);
/** */
@@ -74,20 +79,28 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
}
+ public long requestTxCounterOnCoordinator(GridCacheVersion txVer) {
+ assert cctx.localNode().equals(assignHist.currentCoordinator());
+
+ return assignTxCounter(txVer);
+ }
+
/**
* @param crd Coordinator.
- * @param txId Transaction ID.
+ * @param tx Transaction.
* @return Counter request future.
*/
- public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, GridCacheVersion txId) {
- MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+ public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, IgniteInternalTx tx) {
+ assert !crd.isLocal() : crd;
+
+ MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, tx);
cntrFuts.put(fut.id, fut);
try {
cctx.gridIO().sendToGridTopic(crd,
TOPIC_CACHE_COORDINATOR,
- new CoordinatorTxCounterRequest(fut.id, txId),
+ new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()),
SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
@@ -98,8 +111,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
return fut;
}
+ /**
+ * @param crd Coordinator.
+ * @return Counter request future.
+ */
public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) {
- MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+ MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, null);
cntrFuts.put(fut.id, fut);
@@ -118,6 +135,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
}
/**
+ * @param crd Coordinator.
* @param txId Transaction ID.
* @return Acknowledge future.
*/
@@ -144,6 +162,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
return fut;
}
+ /**
+ * @param crd Coordinator.
+ * @param txId Transaction ID.
+ */
public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) {
CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId);
@@ -329,14 +351,32 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
}
/**
- * @param discoCache Cluster topology.
- * @return Assigned coordinator.
+ * @param topVer Topology version.
+ * @return MVCC coordinator for given topology version.
*/
- @Nullable public ClusterNode assignCoordinator(DiscoCache discoCache) {
- // TODO IGNITE-3478
- List<ClusterNode> srvNodes = discoCache.serverNodes();
+ @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) {
+ return assignHist.coordinator(topVer);
+ }
+
+ /**
+ * @param discoCache Discovery snapshot.
+ */
+ public void assignCoordinator(DiscoCache discoCache) {
+ ClusterNode curCrd = assignHist.currentCoordinator();
+
+ if (curCrd == null || !discoCache.allNodes().contains(curCrd)) {
+ ClusterNode newCrd = null;
+
+ if (!discoCache.serverNodes().isEmpty())
+ newCrd = discoCache.serverNodes().get(0);
- return srvNodes.isEmpty() ? null : srvNodes.get(0);
+ if (!F.eq(curCrd, newCrd)) {
+ assignHist.addAssignment(discoCache.version(), newCrd);
+
+ log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
+ ", crd=" + newCrd + ']');
+ }
+ }
}
/**
@@ -347,21 +387,30 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
private final Long id;
/** */
+ private IgniteInternalTx tx;
+
+ /** */
private final ClusterNode crd;
/**
* @param id Future ID.
* @param crd Coordinator.
*/
- MvccCounterFuture(Long id, ClusterNode crd) {
+ MvccCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) {
this.id = id;
this.crd = crd;
+ this.tx = tx;
}
/**
* @param cntr Counter.
*/
void onResponse(long cntr) {
+ assert cntr != TxMvccVersion.COUNTER_NA;
+
+ if (tx != null)
+ tx.mvccCoordinatorCounter(cntr);
+
onDone(cntr);
}
@@ -448,7 +497,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
else if (msg instanceof CoordinatorQueryCounterRequest)
processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
else
- U.warn(log, "Unexpected message received: " + msg);
+ U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
}
}
}