You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/09 13:27:24 UTC
[13/18] ignite git commit: Performance optimizations.
Performance optimizations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4848a70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4848a70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4848a70
Branch: refs/heads/ignite-426-2
Commit: a4848a702fea1573af7f36af91d02f7df3ab64f4
Parents: 7393227
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 9 12:16:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 9 12:16:16 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoMessage.java | 4 +-
.../processors/cache/GridCacheContext.java | 29 +--
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 55 ++--
.../processors/cache/GridCacheMvccManager.java | 145 +++++------
.../distributed/GridDistributedCacheEntry.java | 2 +-
.../distributed/GridDistributedTxMapping.java | 8 +-
.../GridDistributedTxRemoteAdapter.java | 5 +-
.../distributed/dht/GridDhtLockFuture.java | 7 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 13 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 43 ++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 78 +++---
.../cache/distributed/dht/GridDhtTxRemote.java | 45 ++--
.../dht/atomic/GridDhtAtomicCache.java | 1 -
.../dht/colocated/GridDhtColocatedCache.java | 7 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTransactionalCache.java | 7 +-
.../near/GridNearTxFinishFuture.java | 157 ++++++------
.../cache/distributed/near/GridNearTxLocal.java | 21 +-
.../cache/transactions/IgniteInternalTx.java | 3 +-
.../cache/transactions/IgniteTxAdapter.java | 251 ++++++++++---------
.../cache/transactions/IgniteTxHandler.java | 37 +--
.../transactions/IgniteTxLocalAdapter.java | 26 +-
.../cache/transactions/IgniteTxManager.java | 171 ++++++-------
.../GridBoundedConcurrentLinkedHashMap.java | 7 +-
.../GridBoundedConcurrentLinkedHashSet.java | 7 +-
.../util/GridBoundedConcurrentOrderedMap.java | 39 +--
.../internal/util/GridConcurrentFactory.java | 11 +-
.../util/GridConcurrentLinkedHashSet.java | 9 +-
.../ignite/internal/util/IgniteUuidCache.java | 6 +-
.../util/future/GridCompoundFuture.java | 155 ++++++++----
.../java/org/jsr166/ConcurrentHashMap8.java | 2 +-
.../java/org/jsr166/ConcurrentLinkedDeque8.java | 2 +-
.../org/jsr166/ConcurrentLinkedHashMap.java | 195 +++++++++++---
.../GridCacheAffinityBackupsSelfTest.java | 8 +
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
.../GridCacheMissingCommitVersionSelfTest.java | 40 +--
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 2 +-
.../continuous/GridEventConsumeSelfTest.java | 2 +-
...dBoundedConcurrentLinkedHashMapSelfTest.java | 2 +-
.../GridConcurrentLinkedHashMapSelfTest.java | 62 ++++-
.../junits/common/GridCommonAbstractTest.java | 4 +-
...rrentLinkedHashMapMultiThreadedSelfTest.java | 104 ++++----
.../yardstick/cache/IgnitePutTxBenchmark.java | 10 +
.../cache/IgnitePutTxPrimaryOnlyBenchmark.java | 65 +++++
.../IgnitePutTxSkipLocalBackupBenchmark.java | 65 +++++
.../cache/WaitMapExchangeFinishCallable.java | 95 +++++++
48 files changed, 1220 insertions(+), 807 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index c83a281..cb19ba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -143,7 +143,7 @@ public class GridIoMessage implements Message {
/**
* @return Message.
*/
- public Object message() {
+ public Message message() {
return msg;
}
@@ -320,4 +320,4 @@ public class GridIoMessage implements Message {
@Override public String toString() {
return S.toString(GridIoMessage.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f4852c..ee4da46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1488,10 +1488,9 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param log Log.
* @param dhtMap Dht mappings.
* @param nearMap Near mappings.
- * @return {@code True} if mapped.
* @throws GridCacheEntryRemovedException If reader for entry is removed.
*/
- public boolean dhtMap(
+ public void dhtMap(
UUID nearNodeId,
AffinityTopologyVersion topVer,
GridDhtCacheEntry entry,
@@ -1509,7 +1508,7 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node.
- boolean ret = map(entry, dhtRemoteNodes, dhtMap);
+ map(entry, dhtRemoteNodes, dhtMap);
Collection<ClusterNode> nearRemoteNodes = null;
@@ -1530,7 +1529,7 @@ public class GridCacheContext<K, V> implements Externalizable {
if (nearNodes != null && !nearNodes.isEmpty()) {
nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes));
- ret |= map(entry, nearRemoteNodes, nearMap);
+ map(entry, nearRemoteNodes, nearMap);
}
}
@@ -1540,8 +1539,6 @@ public class GridCacheContext<K, V> implements Externalizable {
entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
}
-
- return ret;
}
/**
@@ -1549,10 +1546,9 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param log Log.
* @param dhtMap Dht mappings.
* @param nearMap Near mappings.
- * @return {@code True} if mapped.
* @throws GridCacheEntryRemovedException If reader for entry is removed.
*/
- public boolean dhtMap(
+ public void dhtMap(
GridDhtCacheEntry entry,
GridCacheVersion explicitLockVer,
IgniteLogger log,
@@ -1571,27 +1567,20 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> nearNodes = cand.mappedNearNodes();
- boolean ret = map(entry, dhtNodes, dhtMap);
+ map(entry, dhtNodes, dhtMap);
if (nearNodes != null && !nearNodes.isEmpty())
- ret |= map(entry, nearNodes, nearMap);
-
- return ret;
+ map(entry, nearNodes, nearMap);
}
-
- return false;
}
/**
* @param entry Entry.
* @param nodes Nodes.
* @param map Map.
- * @return {@code True} if mapped.
*/
- private boolean map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
+ private void map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
Map<ClusterNode, List<GridDhtCacheEntry>> map) {
- boolean ret = false;
-
if (nodes != null) {
for (ClusterNode n : nodes) {
List<GridDhtCacheEntry> entries = map.get(n);
@@ -1600,12 +1589,8 @@ public class GridCacheContext<K, V> implements Externalizable {
map.put(n, entries = new LinkedList<>());
entries.add(entry);
-
- ret = true;
}
}
-
- return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 50b01c8..af62e39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -389,7 +389,6 @@ public interface GridCacheEntryEx {
* @param tx Cache transaction.
* @param evtNodeId ID of node responsible for this change.
* @param affNodeId Partitioned node iD.
- * @param writeThrough If {@code true}, persist to the storage.
* @param retval {@code True} if value should be returned (and unmarshalled if needed).
* @param evt Flag to signal event notification.
* @param metrics Flag to signal metrics notification.
@@ -409,7 +408,6 @@ public interface GridCacheEntryEx {
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -1014,4 +1012,4 @@ public interface GridCacheEntryEx {
* Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
*/
public void onUnlock();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ca0995a..df9f5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1108,7 +1108,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheLazyEntry e = new CacheLazyEntry(cctx, key, old);
- Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(cctx, key, old),
+ Object interceptorVal = cctx.config().getInterceptor().onBeforePut(
+ new CacheLazyEntry(cctx, key, old),
val0);
key0 = e.key();
@@ -1212,7 +1213,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -1244,6 +1244,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Cache.Entry entry0 = null;
+ boolean deferred;
+
+ boolean marked = false;
+
synchronized (this) {
checkObsolete();
@@ -1349,40 +1353,33 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
cctx.dataStructures().onEntryUpdated(key, true);
- }
-
- // Persist outside of synchronization. The correctness of the
- // value will be handled by current transaction.
- if (writeThrough)
- cctx.store().remove(tx, keyValue(false));
- if (cctx.deferredDelete() && !detached() && !isInternal())
- cctx.onDeferredDelete(this, newVer);
- else {
- boolean marked = false;
+ deferred = cctx.deferredDelete() && !detached() && !isInternal();
- synchronized (this) {
+ if (!deferred) {
// If entry is still removed.
- if (newVer == ver) {
- if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
- if (log.isDebugEnabled())
- log.debug("Entry could not be marked obsolete (it is still used): " + this);
- }
- else {
- recordNodeId(affNodeId, topVer);
+ assert newVer == ver;
- // If entry was not marked obsolete, then removed lock
- // will be registered whenever removeLock is called.
- cctx.mvcc().addRemoved(cctx, obsoleteVer);
+ if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
+ if (log.isDebugEnabled())
+ log.debug("Entry could not be marked obsolete (it is still used): " + this);
+ }
+ else {
+ recordNodeId(affNodeId, topVer);
- if (log.isDebugEnabled())
- log.debug("Entry was marked obsolete: " + this);
- }
+ if (log.isDebugEnabled())
+ log.debug("Entry was marked obsolete: " + this);
}
}
+ }
- if (marked)
- onMarkedObsolete();
+ if (deferred)
+ cctx.onDeferredDelete(this, newVer);
+
+ if (marked) {
+ assert !deferred;
+
+ onMarkedObsolete();
}
if (intercept)
@@ -4247,4 +4244,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return "IteratorEntry [key=" + key + ']';
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0960c9d..2c14209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -24,7 +25,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
@@ -88,7 +89,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
/** Set of removed lock versions. */
- private Collection<GridCacheVersion> rmvLocks =
+ private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
/** Current local candidates. */
@@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
/** Finish futures. */
- private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+ private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
/** Logger. */
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -143,17 +144,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
Collection<? extends GridCacheFuture> futCol = futs.get(owner.version());
if (futCol != null) {
- for (GridCacheFuture fut : futCol) {
- if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
- GridCacheMvccFuture<Boolean> mvccFut =
- (GridCacheMvccFuture<Boolean>)fut;
-
- // Since this method is called outside of entry synchronization,
- // we can safely invoke any method on the future.
- // Also note that we don't remove future here if it is done.
- // The removal is initiated from within future itself.
- if (mvccFut.onOwnerChanged(entry, owner))
- return;
+ synchronized (futCol) {
+ for (GridCacheFuture fut : futCol) {
+ if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
+ GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
+
+ // Since this method is called outside of entry synchronization,
+ // we can safely invoke any method on the future.
+ // Also note that we don't remove future here if it is done.
+ // The removal is initiated from within future itself.
+ if (mvccFut.onOwnerChanged(entry, owner))
+ return;
+ }
}
}
}
@@ -171,8 +173,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
else if (log.isDebugEnabled())
log.debug("Failed to find transaction for changed owner: " + owner);
- for (FinishLockFuture f : finishFuts)
- f.recheck(entry);
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture f : finishFuts)
+ f.recheck(entry);
+ }
}
/** {@inheritDoc} */
@@ -203,21 +207,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
- for (Collection<GridCacheFuture<?>> futsCol : futs.values()) {
- for (GridCacheFuture<?> fut : futsCol) {
- if (!fut.trackable()) {
- if (log.isDebugEnabled())
- log.debug("Skipping non-trackable future: " + fut);
-
- continue;
- }
-
- fut.onNodeLeft(discoEvt.eventNode().id());
-
- if (fut.isCancelled() || fut.isDone())
- removeFuture(fut);
- }
- }
+ for (GridCacheFuture<?> fut : activeFutures())
+ fut.onNodeLeft(discoEvt.eventNode().id());
for (IgniteInternalFuture<?> fut : atomicFuts.values()) {
if (fut instanceof GridCacheFuture) {
@@ -272,7 +263,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Collection of active futures.
*/
public Collection<GridCacheFuture<?>> activeFutures() {
- return F.flatCollections(futs.values());
+ ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
+
+ for (Collection<GridCacheFuture<?>> verFuts : futs.values()) {
+ synchronized (verFuts) {
+ col.addAll(verFuts);
+ }
+ }
+
+ return col;
}
/**
@@ -345,10 +344,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param err Error.
*/
private void cancelClientFutures(IgniteCheckedException err) {
- for (Collection<GridCacheFuture<?>> futures : futs.values()) {
- for (GridCacheFuture<?> future : futures)
- ((GridFutureAdapter)future).onDone(err);
- }
+ for (GridCacheFuture<?> fut : activeFutures())
+ ((GridFutureAdapter)fut).onDone(err);
for (GridCacheAtomicFuture<?> future : atomicFuts.values())
((GridFutureAdapter)future).onDone(err);
@@ -444,11 +441,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return true;
while (true) {
- Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
- new ConcurrentLinkedDeque8<GridCacheFuture<?>>() {
- /** */
- private int hash;
+ Collection<GridCacheFuture<?>> old = futs.get(fut.version());
+ if (old == null) {
+ Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
{
// Make sure that we add future to queue before
// adding queue to the map of futures.
@@ -456,16 +452,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
@Override public int hashCode() {
- if (hash == 0)
- hash = System.identityHashCode(this);
-
- return hash;
+ return System.identityHashCode(this);
}
@Override public boolean equals(Object obj) {
return obj == this;
}
- });
+ };
+
+ old = futs.putIfAbsent(fut.version(), col);
+ }
if (old != null) {
boolean empty, dup = false;
@@ -474,10 +470,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
empty = old.isEmpty();
if (!empty)
- dup = old.contains(fut);
-
- if (!empty && !dup)
- old.add(fut);
+ dup = !old.add(fut);
}
// Future is being removed, so we force-remove here and try again.
@@ -594,14 +587,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
- if (futs != null)
- for (GridCacheFuture<?> fut : futs)
- if (fut.futureId().equals(futId)) {
- if (log.isDebugEnabled())
- log.debug("Found future in futures map: " + fut);
+ if (futs != null) {
+ synchronized (futs) {
+ for (GridCacheFuture<?> fut : futs) {
+ if (fut.futureId().equals(futId)) {
+ if (log.isDebugEnabled())
+ log.debug("Found future in futures map: " + fut);
- return fut;
+ return fut;
+ }
}
+ }
+ }
if (log.isDebugEnabled())
log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']');
@@ -619,7 +616,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
Collection c = futs.get(ver);
- return c == null ? Collections.<IgniteInternalFuture<T>>emptyList() : (Collection<IgniteInternalFuture<T>>)c;
+ if (c == null)
+ return Collections.<IgniteInternalFuture<T>>emptyList();
+ else {
+ synchronized (c) {
+ return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c);
+ }
+ }
}
/**
@@ -949,12 +952,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
- X.println(">>> rmvLocksSize: " + rmvLocks.size());
+ X.println(">>> rmvLocksSize: " + rmvLocks.sizex());
X.println(">>> dhtLocCandsSize: " + dhtLocCands.size());
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + futs.size());
X.println(">>> near2dhtSize: " + near2dht.size());
- X.println(">>> finishFutsSize: " + finishFuts.size());
+ X.println(">>> finishFutsSize: " + finishFuts.sizex());
}
/**
@@ -974,9 +977,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
- for (FinishLockFuture fut : finishFuts) {
- if (fut.topologyVersion().equals(topVer))
- cands.putAll(fut.pendingLocks());
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture fut : finishFuts) {
+ if (fut.topologyVersion().equals(topVer))
+ cands.putAll(fut.pendingLocks());
+ }
}
return cands;
@@ -1054,8 +1059,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version.
* @return Future that signals when all locks for given partitions will be released.
*/
- private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
- AffinityTopologyVersion topVer) {
+ private IgniteInternalFuture<?> finishLocks(
+ @Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
+ AffinityTopologyVersion topVer
+ ) {
assert topVer.topologyVersion() != 0;
if (topVer.equals(AffinityTopologyVersion.NONE))
@@ -1069,10 +1076,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
finishFuts.remove(finishFut);
-
- // This call is required to make sure that the concurrent queue
- // clears memory occupied by internal nodes.
- finishFuts.peek();
}
});
@@ -1088,8 +1091,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking pending locks for completion.");
- for (FinishLockFuture fut : finishFuts)
- fut.recheck();
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture fut : finishFuts)
+ fut.recheck();
+ }
}
/**
@@ -1250,4 +1255,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return S.toString(FinishLockFuture.class, this, super.toString());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index a138d30..a3eb723 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -403,7 +403,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
doomed = mvcc == null ? null : mvcc.candidate(ver);
- if (doomed == null || doomed.dhtLocal() || (!doomed.local() && !doomed.nearLocal()))
+ if (doomed == null)
addRemoved(ver);
GridCacheVersion obsoleteVer = obsoleteVersionExtras();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 1e78ba2..2d2d935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -23,13 +23,13 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable {
public GridDistributedTxMapping(ClusterNode node) {
this.node = node;
- entries = new GridConcurrentLinkedHashSet<>();
+ entries = new LinkedHashSet<>();
}
/**
@@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable {
*/
private void ensureModifiable() {
if (readOnly) {
- entries = new GridConcurrentLinkedHashSet<>(entries);
+ entries = new LinkedHashSet<>(entries);
readOnly = false;
}
@@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable {
@Override public String toString() {
return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index fcbf58d..93303c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -583,13 +583,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
eventNodeId(),
nodeId,
false,
- false,
true,
true,
topVer,
null,
replicate ? DR_BACKUP : DR_NONE,
- near() ? null : explicitVer, CU.subjectId(this, cctx),
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer);
else {
@@ -629,7 +629,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
eventNodeId(),
nodeId,
false,
- false,
true,
true,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c175b0b..579d701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -782,14 +782,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Mapping entry for DHT lock future: " + this);
- boolean hasRmtNodes = false;
-
// Assign keys to primary nodes.
for (GridDhtCacheEntry entry : entries) {
try {
while (true) {
try {
- hasRmtNodes = cctx.dhtMap(
+ cctx.dhtMap(
nearNodeId,
topVer,
entry,
@@ -823,9 +821,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
}
- if (tx != null)
- tx.needsCompletedVersions(hasRmtNodes);
-
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Mapping won't proceed because future is done: " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4ce4759..3069afd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -1090,8 +1090,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// We have to add completed versions for cases when nearLocal and remote transactions
// execute concurrently.
- res.completedVersions(ctx.tm().committedVersions(req.version()),
- ctx.tm().rolledbackVersions(req.version()));
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version());
+
+ res.completedVersions(versPair.get1(), versPair.get2());
int i = 0;
@@ -1510,8 +1511,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
}
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
// Backups.
for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8c7d985..6de8795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -84,7 +83,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- protected AtomicBoolean mapped = new AtomicBoolean();
+ protected volatile boolean mapped;
/** */
private long dhtThreadId;
@@ -92,9 +91,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** */
protected boolean explicitLock;
- /** */
- private boolean needsCompletedVers;
-
/** Versions of pending locks for entries of this tx. */
private Collection<GridCacheVersion> pendingVers;
@@ -141,20 +137,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
int taskNameHash
) {
super(
- cctx,
- xidVer,
- implicit,
- implicitSingle,
- sys,
- plc,
- concurrency,
- isolation,
- timeout,
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ timeout,
invalidate,
storeEnabled,
onePhaseCommit,
- txSize,
- subjId,
+ txSize,
+ subjId,
taskNameHash
);
@@ -244,16 +240,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*/
protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err);
- /**
- * @param needsCompletedVers {@code True} if needs completed versions.
- */
- public void needsCompletedVersions(boolean needsCompletedVers) {
- this.needsCompletedVers |= needsCompletedVers;
- }
-
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
- return needsCompletedVers;
+ return nearOnOriginatingNode;
}
/**
@@ -281,10 +270,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
* Map explicit locks.
*/
protected void mapExplicitLocks() {
- if (!mapped.get()) {
+ if (!mapped) {
// Explicit locks may participate in implicit transactions only.
if (!implicit()) {
- mapped.set(true);
+ mapped = true;
return;
}
@@ -343,7 +332,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
if (!F.isEmpty(nearEntryMap))
addNearNodeEntryMapping(nearEntryMap);
- mapped.set(true);
+ mapped = true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 61975d7..1d6f633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,10 +60,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -177,7 +179,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
@GridToStringInclude
- private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ private final Set<IgniteTxKey> lockKeys = new HashSet<>();
/** Force keys future for correct transforms. */
private IgniteInternalFuture<?> forceKeysFut;
@@ -267,7 +269,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry);
- boolean rmv = lockKeys.remove(entry.txKey());
+ boolean rmv;
+
+ synchronized (lockKeys) {
+ rmv = lockKeys.remove(entry.txKey());
+ }
return rmv && mapIfLocked();
}
@@ -293,7 +299,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return {@code True} if all locks are owned.
*/
private boolean checkLocks() {
- return locksReady && lockKeys.isEmpty();
+ if (!locksReady)
+ return false;
+
+ synchronized (lockKeys) {
+ return lockKeys.isEmpty();
+ }
}
/** {@inheritDoc} */
@@ -495,8 +506,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
txEntry.cached(entry);
}
- if (tx.optimistic() && txEntry.explicitVersion() == null)
- lockKeys.add(txEntry.txKey());
+ if (tx.optimistic() && txEntry.explicitVersion() == null) {
+ synchronized (lockKeys) {
+ lockKeys.add(txEntry.txKey());
+ }
+ }
while (true) {
try {
@@ -689,7 +703,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheVersion min = tx.minVersion();
- res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));
+ if (tx.needsCompletedVersions()) {
+ IgnitePair<Collection<GridCacheVersion>> versPair = cctx.tm().versions(min);
+
+ res.completedVersions(versPair.get1(), versPair.get2());
+ }
res.pending(localDhtPendingVersions(tx.writeEntries(), min));
@@ -987,21 +1005,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (err0 != null) {
err.compareAndSet(null, err0);
+ tx.rollbackAsync();
+
final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
- tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) {
- try {
- if (replied.compareAndSet(false, true))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response for transaction: " + tx, e);
- }
- }
- }
- });
+ onDone(res, res.error());
return;
}
@@ -1017,20 +1025,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
- boolean hasRemoteNodes = false;
-
// Assign keys to primary nodes.
if (!F.isEmpty(writes)) {
for (IgniteTxEntry write : writes)
- hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(write.txKey()), futDhtMap, futNearMap);
}
if (!F.isEmpty(reads)) {
for (IgniteTxEntry read : reads)
- hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(read.txKey()), futDhtMap, futNearMap);
}
-
- tx.needsCompletedVersions(hasRemoteNodes);
}
if (isDone())
@@ -1223,15 +1227,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param entry Transaction entry.
* @param futDhtMap DHT mapping.
* @param futNearMap Near mapping.
- * @return {@code True} if mapped.
*/
- private boolean map(
+ private void map(
IgniteTxEntry entry,
Map<UUID, GridDistributedTxMapping> futDhtMap,
Map<UUID, GridDistributedTxMapping> futNearMap
) {
if (entry.cached().isLocal())
- return false;
+ return;
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
@@ -1247,8 +1250,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
}
- boolean ret;
-
while (true) {
try {
Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
@@ -1272,10 +1273,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
log.debug("Entry has no near readers: " + entry);
// Exclude local node.
- ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
+ map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
// Exclude DHT nodes.
- ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
+ map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
break;
}
@@ -1285,8 +1286,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
entry.cached(cached);
}
}
-
- return ret;
}
/**
@@ -1294,16 +1293,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param nodes Nodes.
* @param globalMap Map.
* @param locMap Exclude map.
- * @return {@code True} if mapped.
*/
- private boolean map(
+ private void map(
IgniteTxEntry entry,
Iterable<ClusterNode> nodes,
Map<UUID, GridDistributedTxMapping> globalMap,
Map<UUID, GridDistributedTxMapping> locMap
) {
- boolean ret = false;
-
if (nodes != null) {
for (ClusterNode n : nodes) {
GridDistributedTxMapping global = globalMap.get(n.id());
@@ -1332,12 +1328,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
locMap.put(n.id(), loc = new GridDistributedTxMapping(n));
loc.add(entry);
-
- ret = true;
}
}
-
- return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f8be2a7..e268a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
topologyVersion(topVer);
}
@@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
this.rmtFutId = rmtFutId;
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
topologyVersion(topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4cd9e84..7f9edb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f03b461..83c220d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
@@ -688,8 +689,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (map == null || map.isEmpty())
return;
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index af43113..0002180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +57,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -293,7 +294,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
txMapping = new GridDhtTxMapping();
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+ Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
if (!F.isEmpty(writes)) {
for (int cacheId : tx.activeCacheIds()) {
@@ -353,7 +354,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*
* @param mappings Queue of mappings.
*/
- private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+ private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
if (isDone())
return;
@@ -556,7 +557,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private AtomicBoolean rcvRes = new AtomicBoolean(false);
/** Mappings to proceed prepare. */
- private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+ private Queue<GridDistributedTxMapping> mappings;
/**
* @param m Mapping.
@@ -564,7 +565,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
MiniFuture(
GridDistributedTxMapping m,
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+ Queue<GridDistributedTxMapping> mappings
) {
this.m = m;
this.mappings = mappings;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 0e8aa0d..5ab85b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -712,8 +713,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (map == null || map.isEmpty())
return;
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 46c9f3e..a9dbda2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -88,15 +87,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** Commit flag. */
private boolean commit;
- /** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
/** Node mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings;
+ private Map<UUID, GridDistributedTxMapping> mappings;
/** Trackable flag. */
private boolean trackable = true;
+ /** */
+ private boolean finishOnePhaseCalled;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -176,38 +175,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- * @param e Error.
- */
- void onError(Throwable e) {
- tx.commitError(e);
-
- if (err.compareAndSet(null, e)) {
- boolean marked = tx.setRollbackOnly();
-
- if (e instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
- else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
- try {
- tx.close();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to invalidate transaction: " + tx, ex);
- }
- }
-
- onComplete();
- }
- }
-
- /**
* @param nodeId Sender.
* @param res Result.
*/
@@ -247,24 +214,56 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
- if ((initialized() || err != null)) {
- if (tx.needCheckBackup()) {
- assert tx.onePhaseCommit();
+ if (isDone())
+ return false;
- if (err != null)
- err = new TransactionRollbackException("Failed to commit transaction.", err);
+ synchronized (this) {
+ if (isDone())
+ return false;
- try {
- tx.finish(err == null);
+ if (err != null) {
+ tx.commitError(err);
+
+ boolean marked = tx.setRollbackOnly();
+
+ if (err instanceof IgniteTxRollbackCheckedException) {
+ if (marked) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
+ }
+ }
}
- catch (IgniteCheckedException e) {
- if (err != null)
- err.addSuppressed(e);
- else
- err = e;
+ else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
+ try {
+ tx.close();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to invalidate transaction: " + tx, ex);
+ }
}
}
+ if (initialized() || err != null) {
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ if (err != null)
+ err = new TransactionRollbackException("Failed to commit transaction.", err);
+
+ try {
+ tx.finish(err == null);
+ }
+ catch (IgniteCheckedException e) {
+ if (err != null)
+ err.addSuppressed(e);
+ else
+ err = e;
+ }
+ }
+
if (tx.onePhaseCommit()) {
boolean commit = this.commit && err == null;
@@ -273,36 +272,35 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.tmFinish(commit);
}
- Throwable th = this.err.get();
-
- if (super.onDone(tx0, th != null ? th : err)) {
- if (error() instanceof IgniteTxHeuristicCheckedException) {
- AffinityTopologyVersion topVer = tx.topologyVersion();
+ if (super.onDone(tx0, err)) {
+ if (error() instanceof IgniteTxHeuristicCheckedException) {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- for (IgniteTxEntry e : tx.writeMap().values()) {
- GridCacheContext cacheCtx = e.context();
+ for (IgniteTxEntry e : tx.writeMap().values()) {
+ GridCacheContext cacheCtx = e.context();
- try {
- if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
- GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
+ try {
+ if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
+ GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
- if (entry != null)
- entry.invalidate(null, tx.xidVersion());
+ if (entry != null)
+ entry.invalidate(null, tx.xidVersion());
+ }
}
- }
- catch (Throwable t) {
- U.error(log, "Failed to invalidate entry.", t);
+ catch (Throwable t) {
+ U.error(log, "Failed to invalidate entry.", t);
- if (t instanceof Error)
- throw (Error)t;
+ if (t instanceof Error)
+ throw (Error)t;
+ }
}
}
- }
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
- return true;
+ return true;
+ }
}
}
@@ -321,7 +319,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* Completeness callback.
*/
private void onComplete() {
- onDone(tx, err.get());
+ onDone(tx);
}
/**
@@ -354,7 +352,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
markInitialized();
- if (!isSync()) {
+ if (!isSync() && !isDone()) {
boolean complete = true;
for (IgniteInternalFuture<?> f : pending())
@@ -367,15 +365,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
+ onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
}
catch (Error | RuntimeException e) {
- onError(e);
+ onDone(e);
throw e;
}
catch (IgniteCheckedException e) {
- onError(e);
+ onDone(e);
}
}
@@ -415,7 +413,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
"(backup has left grid): " + tx.xidVersion(), cause));
}
else if (backup.isLocal()) {
- boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
+ boolean committed = !cctx.tm().addRolledbackTx(tx);
readyNearMappingFromBackup(mapping);
@@ -515,6 +513,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
private void finishOnePhase(boolean commit) {
+ assert Thread.holdsLock(this);
+
+ if (finishOnePhaseCalled)
+ return;
+
+ finishOnePhaseCalled = true;
+
// No need to send messages as transaction was already committed on remote node.
// Finish local mapping only as we need send commit message to backups.
for (GridDistributedTxMapping m : mappings.values()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 883c285..db4a4b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
@@ -88,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** DHT mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+ private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
/** Future. */
@GridToStringExclude
@@ -217,7 +216,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> addReader(
- long msgId,
+ long msgId,
GridDhtCacheEntry cached,
IgniteTxEntry entry,
AffinityTopologyVersion topVer
@@ -472,7 +471,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @return DHT map.
*/
- ConcurrentMap<UUID, GridDistributedTxMapping> mappings() {
+ Map<UUID, GridDistributedTxMapping> mappings() {
return mappings;
}
@@ -798,14 +797,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
catch (Error | RuntimeException e) {
commitErr.compareAndSet(null, e);
- fut0.onError(e);
+ fut0.onDone(e);
throw e;
}
catch (IgniteCheckedException e) {
commitErr.compareAndSet(null, e);
- fut0.onError(e);
+ fut0.onDone(e);
}
}
});
@@ -1152,8 +1151,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override protected GridCacheEntryEx entryEx(
- GridCacheContext cacheCtx,
- IgniteTxKey key,
+ GridCacheContext cacheCtx,
+ IgniteTxKey key,
AffinityTopologyVersion topVer
) {
if (cacheCtx.isColocated()) {
@@ -1245,7 +1244,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
@Override public void onRemap(AffinityTopologyVersion topVer) {
assert cctx.kernalContext().clientNode();
- mapped.set(false);
+ mapped = false;
nearLocallyMapped = false;
colocatedLocallyMapped = false;
txNodes = null;
@@ -1254,7 +1253,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
dhtMap.clear();
mappings.clear();
- this.topVer.set(topVer);
+ synchronized (this) {
+ this.topVer = topVer;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 20fb8c2..94af6bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -536,9 +536,8 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @param commitVer Commit version.
- * @return {@code True} if version was set.
*/
- public boolean commitVersion(GridCacheVersion commitVer);
+ public void commitVersion(GridCacheVersion commitVer);
/**
* @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)