You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/27 06:42:41 UTC
ignite git commit: IGNITE-1537 Fixed near optimistic TX future to
avoid early completion.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 895760eed -> de08cd554
IGNITE-1537 Fixed near optimistic TX future to avoid early completion.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de08cd55
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de08cd55
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de08cd55
Branch: refs/heads/ignite-1.5
Commit: de08cd554e75db0df06a5438da5012ebf6c7ad09
Parents: 895760e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Nov 27 08:40:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Nov 27 08:40:16 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 92 ++++----------------
.../near/GridNearOptimisticTxPrepareFuture.java | 57 ++++++------
...ridNearOptimisticTxPrepareFutureAdapter.java | 70 +++++++++++++++
.../cache/local/GridLocalCacheEntry.java | 6 ++
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../util/future/GridCompoundFuture.java | 2 +-
7 files changed, 123 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c55bead..1d418ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -417,7 +417,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
U.error(log, "Failed to get result value for cache entry: " + cached, e);
}
catch (GridCacheEntryRemovedException e) {
- assert false : "Got entry removed exception while holding transactional lock on entry: " + e;
+ assert false : "Got entry removed exception while holding transactional lock on entry [e=" + e + ", cached=" + cached + ']';
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 144070c..916c7ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -39,12 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -76,7 +74,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/** */
@GridToStringExclude
- private KeyLockFuture keyLockFut = new KeyLockFuture();
+ private KeyLockFuture keyLockFut;
/** */
@GridToStringExclude
@@ -134,7 +132,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- keyLockFut.onKeyLocked(entry.txKey());
+ if (keyLockFut != null)
+ keyLockFut.onKeyLocked(entry.txKey());
return true;
}
@@ -189,7 +188,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
err.compareAndSet(null, e);
- keyLockFut.onDone(e);
+ if (keyLockFut != null)
+ keyLockFut.onDone(e);
}
/** {@inheritDoc} */
@@ -210,7 +210,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (err != null) {
this.err.compareAndSet(null, err);
- keyLockFut.onDone(err);
+ if (keyLockFut != null)
+ keyLockFut.onDone(err);
}
return onComplete();
@@ -335,10 +336,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
for (IgniteTxEntry read : reads)
map(read, topVer, mappings, remap, topLocked);
- keyLockFut.onAllKeysAdded();
-
- if (!remap)
- add(keyLockFut);
+ if (keyLockFut != null)
+ keyLockFut.onAllKeysAdded();
if (isDone()) {
if (log.isDebugEnabled())
@@ -535,8 +534,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) {
- if (entry.explicitVersion() == null)
+ if (entry.explicitVersion() == null) {
+ if (keyLockFut == null) {
+ keyLockFut = new KeyLockFuture();
+
+ add(keyLockFut);
+ }
+
keyLockFut.addLockKey(entry.txKey());
+ }
}
IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
@@ -854,68 +860,4 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
-
- /**
- * Keys lock future.
- */
- private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
- /** */
- @GridToStringInclude
- private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
-
- /** */
- private volatile boolean allKeysAdded;
-
- /**
- * @param key Key to track for locking.
- */
- private void addLockKey(IgniteTxKey key) {
- assert !allKeysAdded;
-
- lockKeys.add(key);
- }
-
- /**
- * @param key Locked keys.
- */
- private void onKeyLocked(IgniteTxKey key) {
- lockKeys.remove(key);
-
- checkLocks();
- }
-
- /**
- * Moves future to the ready state.
- */
- private void onAllKeysAdded() {
- allKeysAdded = true;
-
- checkLocks();
- }
-
- /**
- * @return {@code True} if all locks are owned.
- */
- private boolean checkLocks() {
- boolean locked = lockKeys.isEmpty();
-
- if (locked && allKeysAdded) {
- if (log.isDebugEnabled())
- log.debug("All locks are acquired for near prepare future: " + this);
-
- onDone((GridNearTxPrepareResponse)null);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
- }
-
- return locked;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(KeyLockFuture.class, this, super.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e70e574..ca1d36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -40,15 +40,15 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -65,8 +65,8 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
*/
public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
- @GridToStringInclude
- private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ @GridToStringExclude
+ private KeyLockFuture keyLockFut;
/**
* @param cctx Context.
@@ -84,10 +84,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
log.debug("Transaction future received owner changed callback: " + entry);
if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
- lockKeys.remove(entry.txKey());
-
- // This will check for locks.
- onDone();
+ if (keyLockFut != null)
+ keyLockFut.onKeyLocked(entry.txKey());
return true;
}
@@ -151,24 +149,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- /**
- * @return {@code True} if all locks are owned.
- */
- private boolean checkLocks() {
- boolean locked = lockKeys.isEmpty();
-
- if (locked) {
- if (log.isDebugEnabled())
- log.debug("All locks are acquired for near prepare future: " + this);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
- }
-
- return locked;
- }
-
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
@@ -215,8 +195,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx t, Throwable err) {
- // If locks were not acquired yet, delay completion.
- if (isDone() || (err == null && !checkLocks()))
+ if (isDone())
return false;
this.err.compareAndSet(null, err);
@@ -320,6 +299,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
return;
}
+ if (keyLockFut != null)
+ keyLockFut.onAllKeysAdded();
+
tx.addSingleEntryMapping(mapping, write);
cctx.mvcc().recheckPendingLocks();
@@ -385,6 +367,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
return;
}
+ if (keyLockFut != null)
+ keyLockFut.onAllKeysAdded();
+
tx.addEntryMapping(mappings);
cctx.mvcc().recheckPendingLocks();
@@ -543,8 +528,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (entry.explicitVersion() == null)
- lockKeys.add(entry.txKey());
+ if (entry.explicitVersion() == null) {
+ if (keyLockFut == null) {
+ keyLockFut = new KeyLockFuture();
+
+ add(keyLockFut);
+ }
+
+ keyLockFut.addLockKey(entry.txKey());
+ }
}
if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
@@ -594,10 +586,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
", loc=" + ((MiniFuture)f).node().isLocal() +
", done=" + f.isDone() + "]";
}
+ }, new P1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public boolean apply(IgniteInternalFuture<GridNearTxPrepareResponse> fut) {
+ return isMini(fut);
+ }
});
return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
"innerFuts", futs,
+ "keyLockFut", keyLockFut,
"tx", tx,
"super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 6b7244a..5c7553f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -17,14 +17,20 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -157,4 +163,68 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
protected abstract void prepare0(boolean remap, boolean topLocked);
+
+ /**
+ * Keys lock future.
+ */
+ protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+ /** */
+ @GridToStringInclude
+ private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+ /** */
+ private volatile boolean allKeysAdded;
+
+ /**
+ * @param key Key to track for locking.
+ */
+ protected void addLockKey(IgniteTxKey key) {
+ assert !allKeysAdded;
+
+ lockKeys.add(key);
+ }
+
+ /**
+ * @param key Locked keys.
+ */
+ protected void onKeyLocked(IgniteTxKey key) {
+ lockKeys.remove(key);
+
+ checkLocks();
+ }
+
+ /**
+ * Moves future to the ready state.
+ */
+ protected void onAllKeysAdded() {
+ allKeysAdded = true;
+
+ checkLocks();
+ }
+
+ /**
+ * @return {@code True} if all locks are owned.
+ */
+ private boolean checkLocks() {
+ boolean locked = lockKeys.isEmpty();
+
+ if (locked && allKeysAdded) {
+ if (log.isDebugEnabled())
+ log.debug("All locks are acquired for near prepare future: " + this);
+
+ onDone((GridNearTxPrepareResponse)null);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+ }
+
+ return locked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(KeyLockFuture.class, this, super.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 0ceae20..76bfc46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
@@ -434,4 +435,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
@Override protected void offHeapPointer(long valPtr) {
this.valPtr = valPtr;
}
+
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(GridLocalCacheEntry.class, this, super.toString());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b3ff3a6..f13cff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -598,7 +598,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param entry Cache entry to check.
*/
private void checkCommitLocks(GridCacheEntryEx entry) {
- assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
+ assert ownsLockUnsafe(entry) : "Lock is not owned for commit [entry=" + entry +
", tx=" + this + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index cc296e6..4b2461e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -423,7 +423,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public String toString() {
- return "Compound future listener: " + GridCompoundFuture.this;
+ return "Compound future listener []";
}
}
}