You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/03 19:57:08 UTC
ignite git commit: IGNITE-2265: Replaced atomics with updaters on hot
paths.
Repository: ignite
Updated Branches:
refs/heads/master 80579253f -> cd5cd2efe
IGNITE-2265: Replaced atomics with updaters on hot paths.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd5cd2ef
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd5cd2ef
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd5cd2ef
Branch: refs/heads/master
Commit: cd5cd2efe08efef42aef4f130474c563391e6fbe
Parents: 8057925
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Sun Jan 3 22:58:06 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Sun Jan 3 22:58:06 2016 +0400
----------------------------------------------------------------------
.../GridDistributedTxRemoteAdapter.java | 12 ++-
.../dht/CacheDistributedGetFutureAdapter.java | 10 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 15 +++-
.../cache/distributed/dht/GridDhtTxLocal.java | 31 ++++---
.../distributed/dht/GridDhtTxPrepareFuture.java | 48 ++++++----
.../dht/GridPartitionedGetFuture.java | 2 +-
.../distributed/near/GridNearGetFuture.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 94 +++++++++++---------
.../near/GridNearOptimisticTxPrepareFuture.java | 63 +++++++------
.../GridNearPessimisticTxPrepareFuture.java | 6 +-
.../cache/distributed/near/GridNearTxLocal.java | 69 ++++++++------
.../near/GridNearTxPrepareFutureAdapter.java | 8 +-
.../cache/local/GridLocalLockFuture.java | 19 ++--
.../cache/transactions/IgniteTxAdapter.java | 24 ++---
.../cache/transactions/IgniteTxEntry.java | 12 ++-
.../transactions/IgniteTxLocalAdapter.java | 38 ++++----
.../internal/GridUpdateNotifierSelfTest.java | 2 +-
17 files changed, 284 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 1fd0b2e..8e9d4a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -25,7 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -87,6 +88,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
/** */
private static final long serialVersionUID = 0L;
+ /** Commit allowed field updater. */
+ private static final AtomicIntegerFieldUpdater<GridDistributedTxRemoteAdapter> COMMIT_ALLOWED_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridDistributedTxRemoteAdapter.class, "commitAllowed");
+
/** Explicit versions. */
@GridToStringInclude
private List<GridCacheVersion> explicitVers;
@@ -96,8 +101,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
private boolean started;
/** {@code True} only if all write entries are locked by this transaction. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringInclude
- private AtomicBoolean commitAllowed = new AtomicBoolean(false);
+ private volatile int commitAllowed;
/** */
@GridToStringInclude
@@ -440,7 +446,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
// Only one thread gets to commit.
- if (commitAllowed.compareAndSet(false, true)) {
+ if (COMMIT_ALLOWED_UPD.compareAndSet(this, 0, 1)) {
IgniteCheckedException err = null;
Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index cfbc21b..c43cce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -21,7 +21,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -50,6 +51,10 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
/** Maximum number of attempts to remap key to the same primary node. */
protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+ /** Remap count updater. */
+ protected static final AtomicIntegerFieldUpdater<CacheDistributedGetFutureAdapter> REMAP_CNT_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(CacheDistributedGetFutureAdapter.class, "remapCnt");
+
/** Context. */
protected final GridCacheContext<K, V> cctx;
@@ -69,7 +74,8 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
protected boolean trackable;
/** Remap count. */
- protected AtomicInteger remapCnt = new AtomicInteger();
+ @SuppressWarnings("UnusedDeclaration")
+ protected volatile int remapCnt;
/** Subject ID. */
protected UUID subjId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 9a0d778..0e5db05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -22,6 +22,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -57,6 +59,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Error updater. */
+ private static final AtomicReferenceFieldUpdater<GridDhtTxFinishFuture, Throwable> ERR_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridDhtTxFinishFuture.class, Throwable.class, "err");
+
/** Logger. */
private static IgniteLogger log;
@@ -74,8 +80,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private boolean commit;
/** Error. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ private volatile Throwable err;
/** DHT mappings. */
private Map<UUID, GridDistributedTxMapping> dhtMap;
@@ -142,7 +149,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @param e Error.
*/
public void onError(Throwable e) {
- if (err.compareAndSet(null, e)) {
+ if (ERR_UPD.compareAndSet(this, null, e)) {
boolean marked = tx.setRollbackOnly();
if (e instanceof IgniteTxRollbackCheckedException) {
@@ -199,7 +206,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
this.tx.tmFinish(err == null);
- Throwable e = this.err.get();
+ Throwable e = this.err;
if (e == null && commit)
e = this.tx.commitError();
@@ -235,7 +242,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* Completeness callback.
*/
private void onComplete() {
- onDone(tx, err.get());
+ onDone(tx, err);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index f344d48..e026b4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -22,7 +22,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -83,10 +84,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/** Near XID. */
private GridCacheVersion nearXidVer;
+ /** Future updater. */
+ private static final AtomicReferenceFieldUpdater<GridDhtTxLocal, GridDhtTxPrepareFuture> PREP_FUT_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridDhtTxLocal.class, GridDhtTxPrepareFuture.class, "prepFut");
+
/** Future. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
- private final AtomicReference<GridDhtTxPrepareFuture> prepFut =
- new AtomicReference<>();
+ private volatile GridDhtTxPrepareFuture prepFut;
/**
* Empty constructor required for {@link Externalizable}.
@@ -306,18 +311,18 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
// For pessimistic mode we don't distribute prepare request.
- GridDhtTxPrepareFuture fut = prepFut.get();
+ GridDhtTxPrepareFuture fut = prepFut;
if (fut == null) {
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
+ if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true,
needReturnValue())))
- return prepFut.get();
+ return prepFut;
}
else
// Prepare was called explicitly.
@@ -383,20 +388,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
boolean last
) {
// In optimistic mode prepare still can be called explicitly from salvageTx.
- GridDhtTxPrepareFuture fut = prepFut.get();
+ GridDhtTxPrepareFuture fut = prepFut;
if (fut == null) {
init();
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
+ if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
verMap,
last,
needReturnValue()))) {
- GridDhtTxPrepareFuture f = prepFut.get();
+ GridDhtTxPrepareFuture f = prepFut;
assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
@@ -492,7 +497,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
cctx.mvcc().addFuture(fut, fut.futureId());
- GridDhtTxPrepareFuture prep = prepFut.get();
+ GridDhtTxPrepareFuture prep = prepFut;
if (prep != null) {
if (prep.isDone()) {
@@ -571,12 +576,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
@Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
assert optimistic();
- prepFut.compareAndSet(fut, null);
+ PREP_FUT_UPD.compareAndSet(this, fut, null);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
- GridDhtTxPrepareFuture prepFut = this.prepFut.get();
+ GridDhtTxPrepareFuture prepFut = this.prepFut;
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
@@ -687,7 +692,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
- return prepFut.get();
+ return prepFut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 47dafc8..23fdbf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -26,8 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
@@ -100,6 +101,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Error updater. */
+ private static final AtomicReferenceFieldUpdater<GridDhtTxPrepareFuture, Throwable> ERR_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, Throwable.class, "err");
+
/** */
private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER =
new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() {
@@ -113,6 +118,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
};
+ /** Replied flag updater. */
+ private static final AtomicIntegerFieldUpdater<GridDhtTxPrepareFuture> REPLIED_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "replied");
+
+ /** Mapped flag updater. */
+ private static final AtomicIntegerFieldUpdater<GridDhtTxPrepareFuture> MAPPED_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "mapped");
+
/** Logger. */
private static IgniteLogger log;
@@ -133,13 +146,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private Map<UUID, GridDistributedTxMapping> dhtMap;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile Throwable err;
/** Replied flag. */
- private AtomicBoolean replied = new AtomicBoolean(false);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile int replied;
/** All replies flag. */
- private AtomicBoolean mapped = new AtomicBoolean(false);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile int mapped;
/** Prepare reads. */
private Iterable<IgniteTxEntry> reads;
@@ -570,9 +586,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*
* @return {@code True} if all locks are acquired, {@code false} otherwise.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private boolean mapIfLocked() {
if (checkLocks()) {
- if (!mapped.compareAndSet(false, true))
+ if (!MAPPED_UPD.compareAndSet(this, 0, 1))
return false;
if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
@@ -606,7 +623,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " +
"pending mini futures: " + this;
- this.err.compareAndSet(null, err);
+ ERR_UPD.compareAndSet(this, null, err);
// Must clear prepare future before response is sent or listeners are notified.
if (tx.optimistic())
@@ -616,7 +633,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
assert last;
- Throwable prepErr = this.err.get();
+ Throwable prepErr = this.err;
// Must create prepare response before transaction is committed to grab correct return value.
final GridNearTxPrepareResponse res = createPrepareResponse(prepErr);
@@ -631,7 +648,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
try {
- if (replied.compareAndSet(false, true))
+ if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
sendPrepareResponse(res);
}
catch (IgniteCheckedException e) {
@@ -669,7 +686,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
else {
try {
- if (replied.compareAndSet(false, true))
+ if (REPLIED_UPD.compareAndSet(this, 0, 1))
sendPrepareResponse(res);
}
catch (IgniteCheckedException e) {
@@ -680,8 +697,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return true;
}
else {
- if (replied.compareAndSet(false, true)) {
- GridNearTxPrepareResponse res = createPrepareResponse(this.err.get());
+ if (REPLIED_UPD.compareAndSet(this, 0, 1)) {
+ GridNearTxPrepareResponse res = createPrepareResponse(this.err);
try {
sendPrepareResponse(res);
@@ -720,7 +737,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
if (!tx.nearNodeId().equals(cctx.localNodeId())) {
- Throwable err = this.err.get();
+ Throwable err = this.err;
if (err != null && err instanceof IgniteFutureCancelledException)
return;
@@ -851,7 +868,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (last || tx.isSystemInvalidate())
tx.state(PREPARED);
- if (super.onDone(res, err.get())) {
+ if (super.onDone(res, err)) {
// Don't forget to clean up.
cctx.mvcc().removeMvccFuture(this);
@@ -1045,11 +1062,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
if (err0 != null) {
- err.compareAndSet(null, err0);
+ ERR_UPD.compareAndSet(this, null, err0);
tx.rollbackAsync();
- final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
+ final GridNearTxPrepareResponse res = createPrepareResponse(err);
onDone(res, res.error());
@@ -1467,6 +1484,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param res Result callback.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
void onResult(GridDhtTxPrepareResponse res) {
if (res.error() != null)
// Fail the whole compound future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index e8aaca0..19df1c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -474,7 +474,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
if (keys != null && keys.containsKey(key)) {
- if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
U.toShortString(node) + ", mappings=" + mapped + ']'));
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a121af9..c547a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -577,7 +577,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
if (keys != null && keys.containsKey(key)) {
- if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
"[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 37dc564..2090e04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -23,7 +23,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
@@ -118,7 +119,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
"read/write conflict [key=" + key + ", cache=" + ctx.name() + ']');
- err.compareAndSet(null, err0);
+ ERR_UPD.compareAndSet(this, null, err0);
}
break;
@@ -187,7 +188,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.removeMapping(m.node().id());
}
- err.compareAndSet(null, e);
+ ERR_UPD.compareAndSet(this, null, e);
if (keyLockFut != null)
keyLockFut.onDone(e);
@@ -209,7 +210,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return false;
if (err != null) {
- this.err.compareAndSet(null, err);
+ ERR_UPD.compareAndSet(this, null, err);
if (keyLockFut != null)
keyLockFut.onDone(err);
@@ -263,7 +264,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @return {@code True} if future was finished by this call.
*/
private boolean onComplete() {
- Throwable err0 = err.get();
+ Throwable err0 = err;
if (err0 == null || tx.needCheckBackup())
tx.state(PREPARED);
@@ -366,7 +367,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(m));
+ add(new MiniFuture(this, m));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -410,7 +411,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @return {@code True} if skip future during remap.
*/
private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
- return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get());
+ return !(isMini(fut)) || (remap && (((MiniFuture)fut).rcvRes == 1));
}
/**
@@ -630,7 +631,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
*
*/
- private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
+ private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
/** */
private boolean remap = true;
@@ -660,24 +661,34 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
*
*/
- private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+ private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
+ /** Receive result flag updater. */
+ private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
+
/** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
+ /** Parent future. */
+ private final GridNearOptimisticSerializableTxPrepareFuture parent;
+
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
/** Flag to signal some result being processed. */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile int rcvRes;
/**
+ * @param parent Parent future.
* @param m Mapping.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m) {
+ this.parent = parent;
this.m = m;
}
@@ -706,8 +717,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param e Error.
*/
void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
- onError(m, e);
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
+ parent.onError(m, e);
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
@@ -717,7 +728,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
else
U.warn(log, "Received error after another result has been processed [fut=" +
- GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e);
+ parent + ", mini=" + this + ']', e);
}
/**
@@ -727,11 +738,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
- onError(null, e);
+ parent.onError(null, e);
onDone(e);
}
@@ -740,40 +751,40 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* @param res Result callback.
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
void onResult(final GridNearTxPrepareResponse res) {
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (res.error() != null) {
// Fail the whole compound future.
- onError(m, res.error());
+ parent.onError(m, res.error());
onDone(res.error());
}
else {
if (res.clientRemapVersion() != null) {
- assert cctx.kernalContext().clientNode();
+ assert parent.cctx.kernalContext().clientNode();
assert m.clientFirst();
- tx.removeMapping(m.node().id());
+ parent.tx.removeMapping(m.node().id());
ClientRemapFuture remapFut0 = null;
- synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
- if (remapFut == null) {
- remapFut = new ClientRemapFuture();
+ synchronized (parent) {
+ if (parent.remapFut == null) {
+ parent.remapFut = new ClientRemapFuture();
- remapFut0 = remapFut;
+ remapFut0 = parent.remapFut;
}
}
if (remapFut0 != null) {
- Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+ Collection<IgniteInternalFuture<?>> futs = (Collection)parent.futures();
for (IgniteInternalFuture<?> fut : futs) {
- if (isMini(fut) && fut != this)
+ if (parent.isMini(fut) && fut != this)
remapFut0.add((MiniFuture)fut);
}
@@ -783,22 +794,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@Override public void apply(IgniteInternalFuture<Boolean> remapFut0) {
try {
IgniteInternalFuture<?> affFut =
- cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+ parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
if (affFut == null)
affFut = new GridFinishedFuture<Object>();
- if (remapFut.get()) {
+ if (parent.remapFut.get()) {
if (log.isDebugEnabled()) {
log.debug("Will remap client tx [" +
- "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this +
+ "fut=" + parent +
", topVer=" + res.topologyVersion() + ']');
}
- synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
- assert remapFut0 == remapFut;
+ synchronized (parent) {
+ assert remapFut0 == parent.remapFut;
- remapFut = null;
+ parent.remapFut = null;
}
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -809,7 +820,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
remap(res);
}
catch (IgniteCheckedException e) {
- err.compareAndSet(null, e);
+ ERR_UPD.compareAndSet(parent, null, e);
onDone(e);
}
@@ -822,7 +833,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
err0.retryReadyFuture(affFut);
- err.compareAndSet(null, err0);
+ ERR_UPD.compareAndSet(parent, null, err0);
onDone(err0);
}
@@ -830,10 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
catch (IgniteCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Prepare failed, will not remap tx: " +
- GridNearOptimisticSerializableTxPrepareFuture.this);
+ parent);
}
- err.compareAndSet(null, e);
+ ERR_UPD.compareAndSet(parent, null, e);
onDone(e);
}
@@ -844,10 +855,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
- onPrepareResponse(m, res);
+ parent.onPrepareResponse(m, res);
// Finish this mini future (need result only on client node).
- onDone(cctx.kernalContext().clientNode() ? res : null);
+ onDone(parent.cctx.kernalContext().clientNode() ? res : null);
}
}
}
@@ -857,8 +868,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param res Response.
*/
private void remap(final GridNearTxPrepareResponse res) {
- prepareOnTopology(true, new Runnable() {
- @Override public void run() {
+ parent.prepareOnTopology(true, new Runnable() {
+ @Override
+ public void run() {
onDone(res);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index a9f158a..bae0327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -24,7 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
@@ -132,7 +133,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- if (err.compareAndSet(null, e)) {
+ if (ERR_UPD.compareAndSet(this, null, e)) {
boolean marked = tx.setRollbackOnly();
if (e instanceof IgniteTxRollbackCheckedException) {
@@ -199,7 +200,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (isDone())
return false;
- this.err.compareAndSet(null, err);
+ ERR_UPD.compareAndSet(this, null, err);
return onComplete();
}
@@ -218,7 +219,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return {@code True} if future was finished by this call.
*/
private boolean onComplete() {
- Throwable err0 = err.get();
+ Throwable err0 = err;
if (err0 == null || tx.needCheckBackup())
tx.state(PREPARED);
@@ -448,7 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- final MiniFuture fut = new MiniFuture(m, mappings);
+ final MiniFuture fut = new MiniFuture(this, m, mappings);
req.miniId(fut.futureId());
@@ -611,10 +612,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
*
*/
- private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+ private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
+ /** Receive result flag updater. */
+ private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
+
+ /** Parent future. */
+ private final GridNearOptimisticTxPrepareFuture parent;
+
/** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
@@ -623,19 +631,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private GridDistributedTxMapping m;
/** Flag to signal some result being processed. */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile int rcvRes;
/** Mappings to proceed prepare. */
private Queue<GridDistributedTxMapping> mappings;
/**
+ * @param parent Parent.
* @param m Mapping.
* @param mappings Queue of mappings to proceed with.
*/
- MiniFuture(
- GridDistributedTxMapping m,
- Queue<GridDistributedTxMapping> mappings
- ) {
+ MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m,
+ Queue<GridDistributedTxMapping> mappings) {
+ this.parent = parent;
this.m = m;
this.mappings = mappings;
}
@@ -665,7 +674,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param e Error.
*/
void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
@@ -674,7 +683,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
else
U.warn(log, "Received error after another result has been processed [fut=" +
- GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e);
+ parent + ", mini=" + this + ']', e);
}
/**
@@ -684,13 +693,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
// Fail the whole future (make sure not to remap on different primary node
// to prevent multiple lock coordinators).
- onError(e);
+ parent.onError(e);
}
}
@@ -698,21 +707,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param nodeId Failed node ID.
* @param res Result callback.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
+ if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (res.error() != null) {
// Fail the whole compound future.
- onError(res.error());
+ parent.onError(res.error());
}
else {
if (res.clientRemapVersion() != null) {
- assert cctx.kernalContext().clientNode();
+ assert parent.cctx.kernalContext().clientNode();
assert m.clientFirst();
- IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+ IgniteInternalFuture<?> affFut =
+ parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
if (affFut != null && !affFut.isDone()) {
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -730,13 +741,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
else
remap();
- }
- else {
- onPrepareResponse(m, res);
+ } else {
+ parent.onPrepareResponse(m, res);
// Proceed prepare before finishing mini future.
if (mappings != null)
- proceedPrepare(mappings);
+ parent.proceedPrepare(mappings);
// Finish this mini future.
onDone((GridNearTxPrepareResponse)null);
@@ -749,9 +759,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*
*/
private void remap() {
- prepareOnTopology(true, new Runnable() {
- @Override public void run() {
- onDone((GridNearTxPrepareResponse)null);
+ parent.prepareOnTopology(true, new Runnable() {
+ @Override
+ public void run() {
+ onDone((GridNearTxPrepareResponse) null);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ffe5373..9ee9aea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -279,9 +279,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
if (err != null)
- this.err.compareAndSet(null, err);
+ ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, err);
- err = this.err.get();
+ err = this.err;
if (err == null || tx.needCheckBackup())
tx.state(PREPARED);
@@ -384,7 +384,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (log.isDebugEnabled())
log.debug("Error on tx prepare [fut=" + this + ", err=" + e + ", tx=" + tx + ']');
- if (err.compareAndSet(null, e))
+ if (ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e))
tx.setRollbackOnly();
onDone(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ae4972e..aa4e929f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
@@ -84,20 +85,35 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** */
private static final long serialVersionUID = 0L;
+ /** Prepare future updater. */
+ private static final AtomicReferenceFieldUpdater<GridNearTxLocal, IgniteInternalFuture> PREP_FUT_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut");
+
+ /** Prepare future updater. */
+ private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> COMMIT_FUT_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut");
+
+ /** Rollback future updater. */
+ private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut");
+
/** DHT mappings. */
private IgniteTxMappings mappings;
- /** Future. */
+ /** Prepare future. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
- private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>();
+ private volatile IgniteInternalFuture<?> prepFut;
- /** */
+ /** Commit future. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
- private final AtomicReference<GridNearTxFinishFuture> commitFut = new AtomicReference<>();
+ private volatile GridNearTxFinishFuture commitFut;
- /** */
+ /** Rollback future. */
+ @SuppressWarnings("UnusedDeclaration")
@GridToStringExclude
- private final AtomicReference<GridNearTxFinishFuture> rollbackFut = new AtomicReference<>();
+ private volatile GridNearTxFinishFuture rollbackFut;
/** Entries to lock on next step of prepare stage. */
private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList();
@@ -225,7 +241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
- prepFut.compareAndSet(fut, null);
+ PREP_FUT_UPD.compareAndSet(this, fut, null);
}
/** {@inheritDoc} */
@@ -630,7 +646,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- GridCacheMvccFuture<IgniteInternalTx> fut = (GridCacheMvccFuture<IgniteInternalTx>)prepFut.get();
+ GridCacheMvccFuture<IgniteInternalTx> fut = (GridCacheMvccFuture<IgniteInternalTx>)prepFut;
return fut != null && fut.onOwnerChanged(entry, owner);
}
@@ -784,7 +800,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> prepareAsync() {
- GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get();
+ GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut;
if (fut == null) {
// Future must be created before any exception can be thrown.
@@ -796,8 +812,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
else
fut = new GridNearPessimisticTxPrepareFuture(cctx, this);
- if (!prepFut.compareAndSet(null, fut))
- return prepFut.get();
+ if (!PREP_FUT_UPD.compareAndSet(this, null, fut))
+ return prepFut;
}
else
// Prepare was called explicitly.
@@ -818,18 +834,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
prepareAsync();
- GridNearTxFinishFuture fut = commitFut.get();
+ GridNearTxFinishFuture fut = commitFut;
- if (fut == null && !commitFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
- return commitFut.get();
+ if (fut == null &&
+ !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
+ return commitFut;
cctx.mvcc().addFuture(fut, fut.futureId());
- final IgniteInternalFuture<?> prepareFut = prepFut.get();
+ final IgniteInternalFuture<?> prepareFut = prepFut;
prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
- GridNearTxFinishFuture fut0 = commitFut.get();
+ GridNearTxFinishFuture fut0 = commitFut;
try {
// Make sure that here are no exceptions.
@@ -838,14 +855,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
fut0.finish();
}
catch (Error | RuntimeException e) {
- commitErr.compareAndSet(null, e);
+ COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
fut0.onDone(e);
throw e;
}
catch (IgniteCheckedException e) {
- commitErr.compareAndSet(null, e);
+ COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
fut0.onDone(e);
}
@@ -860,17 +877,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (log.isDebugEnabled())
log.debug("Rolling back near tx: " + this);
- GridNearTxFinishFuture fut = rollbackFut.get();
+ GridNearTxFinishFuture fut = rollbackFut;
if (fut != null)
return fut;
- if (!rollbackFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, false)))
- return rollbackFut.get();
+ if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, false)))
+ return rollbackFut;
cctx.mvcc().addFuture(fut, fut.futureId());
- IgniteInternalFuture<?> prepFut = this.prepFut.get();
+ IgniteInternalFuture<?> prepFut = this.prepFut;
if (prepFut == null || prepFut.isDone()) {
try {
@@ -897,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
}
- GridNearTxFinishFuture fut0 = rollbackFut.get();
+ GridNearTxFinishFuture fut0 = rollbackFut;
fut0.finish();
}
@@ -997,7 +1014,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (pessimistic())
prepareAsync();
- IgniteInternalFuture<?> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut;
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
@@ -1070,7 +1087,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut, fut.futureId());
- IgniteInternalFuture<?> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut;
if (prep == null || prep.isDone()) {
try {
@@ -1279,7 +1296,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
- return prepFut.get();
+ return prepFut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 52cad91..ce4d2e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -53,6 +54,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Error updater. */
+ protected static final AtomicReferenceFieldUpdater<GridNearTxPrepareFutureAdapter, Throwable> ERR_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class, "err");
+
/** */
private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER =
new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() {
@@ -81,7 +86,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/** Error. */
@GridToStringExclude
- protected AtomicReference<Throwable> err = new AtomicReference<>(null);
+ protected volatile Throwable err;
/** Trackable flag. */
protected boolean trackable = true;
@@ -165,6 +170,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param m Mapping.
* @param res Response.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
if (res == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 9f53c18..2e41f2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -22,6 +22,8 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -53,6 +55,10 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Error updater. */
+ private static final AtomicReferenceFieldUpdater<GridLocalLockFuture, Throwable> ERR_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridLocalLockFuture.class, Throwable.class, "err");
+
/** Logger. */
private static IgniteLogger log;
@@ -79,7 +85,8 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
private GridCacheVersion lockVer;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile Throwable err;
/** Timeout object. */
@GridToStringExclude
@@ -274,7 +281,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
* @param t Error.
*/
void onError(Throwable t) {
- if (err.compareAndSet(null, t))
+ if (ERR_UPD.compareAndSet(this, null, t))
onFailed();
}
@@ -392,7 +399,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
if (!success)
undoLocks();
- if (onDone(success, err.get())) {
+ if (onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
@@ -409,8 +416,10 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
* @throws IgniteCheckedException If execution failed.
*/
private void checkError() throws IgniteCheckedException {
- if (err.get() != null)
- throw U.cast(err.get());
+ Throwable err0 = err;
+
+ if (err0 != null)
+ throw U.cast(err0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 53f4f56..22e27c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -95,14 +96,17 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
/**
* Managed transaction adapter.
*/
-public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
- implements IgniteInternalTx, Externalizable {
+public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implements IgniteInternalTx, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Finalizing status updater. */
+ private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, FinalizationStatus> FINALIZING_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing");
+
/** Logger. */
protected static IgniteLogger log;
@@ -191,8 +195,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** Commit version. */
private volatile GridCacheVersion commitVer;
- /** */
- private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE);
+ /** Finalizing status. */
+ private volatile FinalizationStatus finalizing = FinalizationStatus.NONE;
/** Done marker. */
protected volatile boolean isDone;
@@ -524,23 +528,23 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
switch (status) {
case USER_FINISH:
- res = finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.USER_FINISH);
+ res = FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.USER_FINISH);
break;
case RECOVERY_WAIT:
- finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
+ FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
- FinalizationStatus cur = finalizing.get();
+ FinalizationStatus cur = finalizing;
res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH;
break;
case RECOVERY_FINISH:
- FinalizationStatus old = finalizing.get();
+ FinalizationStatus old = finalizing;
- res = old != FinalizationStatus.USER_FINISH && finalizing.compareAndSet(old, status);
+ res = old != FinalizationStatus.USER_FINISH && FINALIZING_UPD.compareAndSet(this, old, status);
break;
@@ -565,7 +569,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @return Finalization status.
*/
protected FinalizationStatus finalizationStatus() {
- return finalizing.get();
+ return finalizing;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 2c6c3df..c42bc7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -73,6 +73,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Dummy version for any existing entry read in SERIALIZABLE transaction. */
public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
+ /** Prepared flag updater. */
+ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
+
/** Owning transaction. */
@GridToStringExclude
@GridDirectTransient
@@ -149,9 +153,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
private GridCacheContext<?, ?> ctx;
/** Prepared flag to prevent multiple candidate add. */
- @SuppressWarnings({"TransientFieldNotInitialized"})
+ @SuppressWarnings("UnusedDeclaration")
@GridDirectTransient
- private AtomicBoolean prepared = new AtomicBoolean();
+ private transient volatile int prepared;
/** Lock flag for collocated cache. */
@GridDirectTransient
@@ -441,7 +445,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
* @return True if entry was marked prepared by this call.
*/
boolean markPrepared() {
- return prepared.compareAndSet(false, true);
+ return PREPARED_UPD.compareAndSet(this, 0, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 70c79a5..21ff0cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -29,8 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.expiry.Duration;
@@ -115,16 +115,24 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
* Transaction adapter for cache transactions.
*/
-public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
- implements IgniteTxLocalEx {
+public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements IgniteTxLocalEx {
/** */
private static final long serialVersionUID = 0L;
+ /** Commit error updater. */
+ protected static final AtomicReferenceFieldUpdater<IgniteTxLocalAdapter, Throwable> COMMIT_ERR_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, Throwable.class, "commitErr");
+
+ /** Done flag updater. */
+ protected static final AtomicIntegerFieldUpdater<IgniteTxLocalAdapter> DONE_FLAG_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, "doneFlag");
+
/** Minimal version encountered (either explicit lock or XID of this transaction). */
protected GridCacheVersion minVer;
/** Flag indicating with TM commit happened. */
- protected AtomicBoolean doneFlag = new AtomicBoolean(false);
+ @SuppressWarnings("UnusedDeclaration")
+ protected volatile int doneFlag;
/** Committed versions, relative to base. */
private Collection<GridCacheVersion> committedVers = Collections.emptyList();
@@ -139,7 +147,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
private boolean sndTransformedVals;
/** Commit error. */
- protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
+ protected volatile Throwable commitErr;
/** Need return value. */
protected boolean needRetVal;
@@ -248,12 +256,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public Throwable commitError() {
- return commitErr.get();
+ return commitErr;
}
/** {@inheritDoc} */
@Override public void commitError(Throwable e) {
- commitErr.compareAndSet(null, e);
+ COMMIT_ERR_UPD.compareAndSet(this, null, e);
}
/** {@inheritDoc} */
@@ -1164,7 +1172,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
U.error(log, "Heuristic transaction failure.", err);
- commitErr.compareAndSet(null, err);
+ COMMIT_ERR_UPD.compareAndSet(this, null, err);
state(UNKNOWN);
@@ -1194,7 +1202,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// Do not unlock transaction entries if one-phase commit.
if (!onePhaseCommit()) {
- if (doneFlag.compareAndSet(false, true)) {
+ if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
// Unlock all locks.
cctx.tm().commitTx(this);
@@ -1215,7 +1223,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
public void tmFinish(boolean commit) {
assert onePhaseCommit();
- if (doneFlag.compareAndSet(false, true)) {
+ if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
// Unlock all locks.
if (commit)
cctx.tm().commitTx(this);
@@ -1272,8 +1280,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (state != ROLLING_BACK && state != ROLLED_BACK) {
setRollbackOnly();
- throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']',
- commitErr.get());
+ throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state +
+ ", tx=" + this + ']', commitErr);
}
if (near()) {
@@ -1283,7 +1291,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
evictNearEntry(e, false);
}
- if (doneFlag.compareAndSet(false, true)) {
+ if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
try {
cctx.tm().rollbackTx(this);
@@ -1302,7 +1310,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
catch (Error | IgniteCheckedException | RuntimeException e) {
- U.addLastCause(e, commitErr.get(), log);
+ U.addLastCause(e, commitErr, log);
throw e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index 93fd916..afaa645 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -37,7 +37,7 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return 30 * 1000;
+ return 120 * 1000;
}
/** {@inheritDoc} */