You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 10:22:19 UTC
[21/50] [abbrv] ignite git commit: Merge branch ignite-1.5 into
ignite-1.5-tx-futs-opts
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7e17efe,b266ad2..bd6c2a7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -142,9 -143,9 +142,12 @@@ public final class GridDhtColocatedLock
/** Skip store flag. */
private final boolean skipStore;
+ /** */
+ private Deque<GridNearLockMapping> mappings;
+
+ /** Keep binary. */
+ private final boolean keepBinary;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@@ -727,213 -720,214 +732,242 @@@
*/
private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
- AffinityTopologyVersion topVer = this.topVer.get();
+ map0(
+ keys,
+ remap,
+ topLocked);
+ }
+ catch (IgniteCheckedException ex) {
+ onDone(false, ex);
+ }
+ }
- assert topVer != null;
+ private synchronized void map0(
+ Collection<KeyCacheObject> keys,
+ boolean remap,
+ boolean topLocked
+ ) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = this.topVer;
- assert topVer.topologyVersion() > 0;
+ assert topVer != null;
- if (CU.affinityNodes(cctx, topVer).isEmpty()) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid): " + cctx.name()));
+ assert topVer.topologyVersion() > 0;
- return;
- }
+ if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid): " + cctx.name()));
- boolean clientNode = cctx.kernalContext().clientNode();
+ return;
+ }
- assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+ boolean clientNode = cctx.kernalContext().clientNode();
- // First assume this node is primary for all keys passed in.
- if (!clientNode && mapAsPrimary(keys, topVer))
- return;
+ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
- Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+ // First assume this node is primary for all keys passed in.
+ if (!clientNode && mapAsPrimary(keys, topVer))
+ return;
- // Assign keys to primary nodes.
- GridNearLockMapping map = null;
+ mappings = new ArrayDeque<>();
- for (KeyCacheObject key : keys) {
- GridNearLockMapping updated = map(key, map, topVer);
+ // Assign keys to primary nodes.
+ GridNearLockMapping map = null;
- // If new mapping was created, add to collection.
- if (updated != map) {
- mappings.add(updated);
+ for (KeyCacheObject key : keys) {
+ GridNearLockMapping updated = map(key, map, topVer);
- if (tx != null && updated.node().isLocal())
- tx.colocatedLocallyMapped(true);
- }
+ // If new mapping was created, add to collection.
+ if (updated != map) {
+ mappings.add(updated);
- map = updated;
+ if (tx != null && updated.node().isLocal())
+ tx.colocatedLocallyMapped(true);
}
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
-
- return;
- }
+ map = updated;
+ }
+ if (isDone()) {
if (log.isDebugEnabled())
- log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+ log.debug("Abandoning (re)map because future is done: " + this);
+
+ return;
+ }
- boolean hasRmtNodes = false;
+ if (log.isDebugEnabled())
+ log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
- boolean first = true;
+ boolean hasRmtNodes = false;
- // Create mini futures.
- for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
- GridNearLockMapping mapping = iter.next();
+ boolean first = true;
- ClusterNode node = mapping.node();
- Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+ // Create mini futures.
+ for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+ GridNearLockMapping mapping = iter.next();
- boolean loc = node.equals(cctx.localNode());
+ ClusterNode node = mapping.node();
+ Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
- assert !mappedKeys.isEmpty();
+ boolean loc = node.equals(cctx.localNode());
- GridNearLockRequest req = null;
+ assert !mappedKeys.isEmpty();
- Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+ GridNearLockRequest req = null;
- for (KeyCacheObject key : mappedKeys) {
- IgniteTxKey txKey = cctx.txKey(key);
+ Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
- GridDistributedCacheEntry entry = null;
+ for (KeyCacheObject key : mappedKeys) {
+ IgniteTxKey txKey = cctx.txKey(key);
- if (tx != null) {
- IgniteTxEntry txEntry = tx.entry(txKey);
+ GridDistributedCacheEntry entry = null;
- if (txEntry != null) {
- entry = (GridDistributedCacheEntry)txEntry.cached();
+ if (tx != null) {
+ IgniteTxEntry txEntry = tx.entry(txKey);
- if (entry != null && !(loc ^ entry.detached())) {
- entry = cctx.colocated().entryExx(key, topVer, true);
+ if (txEntry != null) {
+ entry = (GridDistributedCacheEntry)txEntry.cached();
- txEntry.cached(entry);
- }
+ if (entry != null && !(loc ^ entry.detached())) {
+ entry = cctx.colocated().entryExx(key, topVer, true);
+
+ txEntry.cached(entry);
}
}
+ }
- boolean explicit;
+ boolean explicit;
- while (true) {
- try {
- if (entry == null)
- entry = cctx.colocated().entryExx(key, topVer, true);
+ while (true) {
+ try {
+ if (entry == null)
+ entry = cctx.colocated().entryExx(key, topVer, true);
- if (!cctx.isAll(entry, filter)) {
- if (log.isDebugEnabled())
- log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+ if (!cctx.isAll(entry, filter)) {
+ if (log.isDebugEnabled())
+ log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- onComplete(false, false);
+ onComplete(false, false);
- return;
- }
+ return;
+ }
- assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
+ assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
- GridCacheMvccCandidate cand = addEntry(entry);
+ GridCacheMvccCandidate cand = addEntry(entry);
- // Will either return value from dht cache or null if this is a miss.
- IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
- ((GridDhtCacheEntry)entry).versionedValue(topVer);
+ // Will either return value from dht cache or null if this is a miss.
+ IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
+ ((GridDhtCacheEntry)entry).versionedValue(topVer);
- GridCacheVersion dhtVer = null;
+ GridCacheVersion dhtVer = null;
- if (val != null) {
- dhtVer = val.get1();
+ if (val != null) {
+ dhtVer = val.get1();
- valMap.put(key, val);
- }
+ valMap.put(key, val);
+ }
- if (cand != null && !cand.reentry()) {
- if (req == null) {
- boolean clientFirst = false;
+ if (cand != null && !cand.reentry()) {
+ if (req == null) {
+ boolean clientFirst = false;
- if (first) {
- clientFirst = clientNode &&
- !topLocked &&
- (tx == null || !tx.hasRemoteLocks());
+ if (first) {
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
- first = false;
+ first = false;
+ }
+
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
+ retval,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncCommit(),
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
+ keepBinary,
+ clientFirst,
+ cctx.deploymentEnabled());
+
+ mapping.request(req);
}
- distributedKeys.add(key);
-
- if (tx != null)
- tx.addKeyMapping(txKey, mapping.node());
-
- req.addKeyBytes(
- key,
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
retval,
- dhtVer, // Include DHT version to match remote DHT entry.
- cctx);
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncCommit(),
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
+ clientFirst,
+ cctx.deploymentEnabled());
+
+ mapping.request(req);
}
- explicit = inTx() && cand == null;
+ distributedKeys.add(key);
- if (explicit)
+ if (tx != null)
tx.addKeyMapping(txKey, mapping.node());
- break;
+ req.addKeyBytes(
+ key,
+ retval,
+ dhtVer, // Include DHT version to match remote DHT entry.
+ cctx);
}
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- entry = null;
- }
- }
+ explicit = inTx() && cand == null;
- // Mark mapping explicit lock flag.
- if (explicit) {
- boolean marked = tx != null && tx.markExplicit(node.id());
+ if (explicit)
+ tx.addKeyMapping(txKey, mapping.node());
- assert tx == null || marked;
+ break;
}
- }
-
- if (inTx() && req != null)
- req.hasTransforms(tx.hasTransforms());
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- if (!distributedKeys.isEmpty()) {
- mapping.distributedKeys(distributedKeys);
-
- hasRmtNodes |= !mapping.node().isLocal();
+ entry = null;
+ }
}
- else {
- assert mapping.request() == null;
- iter.remove();
+ // Mark mapping explicit lock flag.
+ if (explicit) {
+ boolean marked = tx != null && tx.markExplicit(node.id());
+
+ assert tx == null || marked;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 6f92204,f16573d..832cc3d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -146,10 -150,9 +146,13 @@@ public final class GridNearLockFuture e
/** Skip store flag. */
private final boolean skipStore;
+ /** Mappings to proceed. */
+ @GridToStringExclude
+ private Queue<GridNearLockMapping> mappings;
+
+ /** Keep binary context flag. */
+ private final boolean keepBinary;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@@ -848,221 -845,199 +853,222 @@@
assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
- ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+ synchronized (this) {
+ mappings = new ArrayDeque<>();
- // Assign keys to primary nodes.
- GridNearLockMapping map = null;
+ // Assign keys to primary nodes.
+ GridNearLockMapping map = null;
- for (KeyCacheObject key : keys) {
- GridNearLockMapping updated = map(key, map, topVer);
+ for (KeyCacheObject key : keys) {
+ GridNearLockMapping updated = map(
+ key,
+ map,
+ topVer);
- // If new mapping was created, add to collection.
- if (updated != map) {
- mappings.add(updated);
+ // If new mapping was created, add to collection.
+ if (updated != map) {
+ mappings.add(updated);
- if (tx != null && updated.node().isLocal())
- tx.nearLocallyMapped(true);
+ if (tx != null && updated.node().isLocal())
+ tx.nearLocallyMapped(true);
+ }
+
+ map = updated;
}
- map = updated;
- }
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done: " + this);
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
+ return;
+ }
- return;
- }
+ if (log.isDebugEnabled())
+ log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
- if (log.isDebugEnabled())
- log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+ boolean first = true;
- boolean first = true;
+ // Create mini futures.
+ for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+ GridNearLockMapping mapping = iter.next();
- // Create mini futures.
- for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
- GridNearLockMapping mapping = iter.next();
+ ClusterNode node = mapping.node();
+ Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
- ClusterNode node = mapping.node();
- Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+ assert !mappedKeys.isEmpty();
- assert !mappedKeys.isEmpty();
+ GridNearLockRequest req = null;
- GridNearLockRequest req = null;
+ Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
- Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+ boolean explicit = false;
- boolean explicit = false;
+ for (KeyCacheObject key : mappedKeys) {
+ IgniteTxKey txKey = cctx.txKey(key);
- for (KeyCacheObject key : mappedKeys) {
- IgniteTxKey txKey = cctx.txKey(key);
+ while (true) {
+ GridNearCacheEntry entry = null;
- while (true) {
- GridNearCacheEntry entry = null;
+ try {
+ entry = cctx.near().entryExx(
+ key,
+ topVer);
- try {
- entry = cctx.near().entryExx(key, topVer);
+ if (!cctx.isAll(
+ entry,
+ filter)) {
+ if (log.isDebugEnabled())
+ log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- if (!cctx.isAll(entry, filter)) {
- if (log.isDebugEnabled())
- log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+ onComplete(
+ false,
+ false);
- onComplete(false, false);
+ return;
+ }
- return;
- }
+ // Removed exception may be thrown here.
+ GridCacheMvccCandidate cand = addEntry(
+ topVer,
+ entry,
+ node.id());
- // Removed exception may be thrown here.
- GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id());
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done after addEntry attempt " +
+ "[fut=" + this + ", entry=" + entry + ']');
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done after addEntry attempt " +
- "[fut=" + this + ", entry=" + entry + ']');
+ return;
+ }
- return;
- }
+ if (cand != null) {
+ if (tx == null && !cand.reentry())
+ cctx.mvcc().addExplicitLock(
+ threadId,
+ cand,
+ topVer);
- if (cand != null) {
- if (tx == null && !cand.reentry())
- cctx.mvcc().addExplicitLock(threadId, cand, topVer);
+ IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
- IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
+ if (val == null) {
+ GridDhtCacheEntry dhtEntry = dht().peekExx(key);
- if (val == null) {
- GridDhtCacheEntry dhtEntry = dht().peekExx(key);
+ try {
+ if (dhtEntry != null)
+ val = dhtEntry.versionedValue(topVer);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert dhtEntry.obsolete() : dhtEntry;
- try {
- if (dhtEntry != null)
- val = dhtEntry.versionedValue(topVer);
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception for DHT entry in map (will ignore): "
+ + dhtEntry);
+ }
}
- catch (GridCacheEntryRemovedException ignored) {
- assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: "
- + dhtEntry;
- if (log.isDebugEnabled())
- log.debug("Got removed exception for DHT entry in map (will ignore): "
- + dhtEntry);
- }
- }
+ GridCacheVersion dhtVer = null;
- GridCacheVersion dhtVer = null;
+ if (val != null) {
+ dhtVer = val.get1();
- if (val != null) {
- dhtVer = val.get1();
+ valMap.put(
+ key,
+ val);
+ }
- valMap.put(key, val);
- }
+ if (!cand.reentry()) {
+ if (req == null) {
+ boolean clientFirst = false;
- if (!cand.reentry()) {
- if (req == null) {
- boolean clientFirst = false;
+ if (first) {
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
- if (first) {
- clientFirst = clientNode &&
- !topLocked &&
- (tx == null || !tx.hasRemoteLocks());
+ first = false;
+ }
- first = false;
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
+ retval,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncCommit(),
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
++ keepBinary,
+ clientFirst,
+ cctx.deploymentEnabled());
+
+ mapping.request(req);
}
- req = new GridNearLockRequest(
- cctx.cacheId(),
- topVer,
- cctx.nodeId(),
- threadId,
- futId,
- lockVer,
- inTx(),
- implicitTx(),
- implicitSingleTx(),
- read,
- retval,
- isolation(),
- isInvalidate(),
- timeout,
- mappedKeys.size(),
- inTx() ? tx.size() : mappedKeys.size(),
- inTx() && tx.syncCommit(),
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? accessTtl : -1L,
- skipStore,
- keepBinary,
- clientFirst,
- cctx.deploymentEnabled());
-
- mapping.request(req);
- }
+ distributedKeys.add(key);
- distributedKeys.add(key);
+ if (tx != null)
+ tx.addKeyMapping(
+ txKey,
+ mapping.node());
- if (tx != null)
- tx.addKeyMapping(txKey, mapping.node());
+ req.addKeyBytes(
+ key,
+ retval && dhtVer == null,
+ dhtVer,
+ // Include DHT version to match remote DHT entry.
+ cctx);
+ }
- req.addKeyBytes(
- key,
- retval && dhtVer == null,
- dhtVer, // Include DHT version to match remote DHT entry.
- cctx);
+ if (cand.reentry())
+ explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
}
-
- if (cand.reentry())
+ else
+ // Ignore reentries within transactions.
explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
- }
- else
- // Ignore reentries within transactions.
- explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
- if (explicit)
- tx.addKeyMapping(txKey, mapping.node());
+ if (explicit)
+ tx.addKeyMapping(
+ txKey,
+ mapping.node());
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
- if (log.isDebugEnabled())
- log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ }
}
- }
- // Mark mapping explicit lock flag.
- if (explicit) {
- boolean marked = tx != null && tx.markExplicit(node.id());
+ // Mark mapping explicit lock flag.
+ if (explicit) {
+ boolean marked = tx != null && tx.markExplicit(node.id());
- assert tx == null || marked;
+ assert tx == null || marked;
+ }
}
- }
- if (!distributedKeys.isEmpty())
- mapping.distributedKeys(distributedKeys);
- else {
- assert mapping.request() == null;
+ if (!distributedKeys.isEmpty())
+ mapping.distributedKeys(distributedKeys);
+ else {
+ assert mapping.request() == null;
- iter.remove();
+ iter.remove();
+ }
}
}
@@@ -1454,165 -1440,158 +1461,166 @@@
* @param res Result callback.
*/
void onResult(GridNearLockResponse res) {
- if (rcvRes.compareAndSet(false, true)) {
- if (res.error() != null) {
- if (log.isDebugEnabled())
- log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
- ", res=" + res + ']');
+ synchronized (this) {
+ if (!rcvRes)
+ rcvRes = true;
+ else
+ return;
+ }
- // Fail.
- if (res.error() instanceof GridCacheLockTimeoutException)
- onDone(false);
- else
- onDone(res.error());
+ if (res.error() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
+ ", res=" + res + ']');
- return;
- }
+ // Fail.
+ if (res.error() instanceof GridCacheLockTimeoutException)
+ onDone(false);
+ else
+ onDone(res.error());
- if (res.clientRemapVersion() != null) {
- assert cctx.kernalContext().clientNode();
+ return;
+ }
- IgniteInternalFuture<?> affFut =
- cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- if (affFut != null && !affFut.isDone()) {
- affFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- try {
- fut.get();
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- remap();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- finally {
- cctx.shared().txContextReset();
- }
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+
+ remap();
}
- });
- }
- else
- remap();
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ finally {
+ cctx.shared().txContextReset();
+ }
+ }
+ });
}
- else {
- int i = 0;
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+ AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer;
- for (KeyCacheObject k : keys) {
- while (true) {
- GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+ for (KeyCacheObject k : keys) {
+ while (true) {
+ GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ try {
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- return;
- }
+ return;
+ }
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
- CacheObject oldVal = entry.rawGet();
- boolean hasOldVal = false;
- CacheObject newVal = res.value(i);
+ CacheObject oldVal = entry.rawGet();
+ boolean hasOldVal = false;
+ CacheObject newVal = res.value(i);
- boolean readRecordable = false;
+ boolean readRecordable = false;
- if (retval) {
- readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ if (retval) {
+ readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
- if (readRecordable)
- hasOldVal = entry.hasValue();
- }
+ if (readRecordable)
+ hasOldVal = entry.hasValue();
+ }
- GridCacheVersion dhtVer = res.dhtVersion(i);
- GridCacheVersion mappedVer = res.mappedVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion mappedVer = res.mappedVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
- oldVal = oldValTup.get2();
- }
+ oldVal = oldValTup.get2();
}
+ }
- // Lock is held at this point, so we can set the
- // returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+ // Lock is held at this point, so we can set the
+ // returned value if any.
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
- if (inTx()) {
- tx.hasRemoteLocks(true);
+ if (inTx()) {
+ tx.hasRemoteLocks(true);
- if (implicitTx() && tx.onePhaseCommit()) {
- boolean pass = res.filterResult(i);
+ if (implicitTx() && tx.onePhaseCommit()) {
+ boolean pass = res.filterResult(i);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
- }
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
}
+ }
- entry.readyNearLock(lockVer,
- mappedVer,
- res.committedVersions(),
- res.rolledbackVersions(),
- res.pending());
-
- if (retval) {
- if (readRecordable)
- cctx.events().addEvent(
- entry.partition(),
- entry.key(),
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- oldVal,
- hasOldVal,
- CU.subjectId(tx, cctx.shared()),
- null,
- inTx() ? tx.resolveTaskName() : null,
- keepBinary);
-
- if (cctx.cache().configuration().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(false);
- }
+ entry.readyNearLock(lockVer,
+ mappedVer,
+ res.committedVersions(),
+ res.rolledbackVersions(),
+ res.pending());
+
+ if (retval) {
+ if (readRecordable)
+ cctx.events().addEvent(
+ entry.partition(),
+ entry.key(),
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ oldVal,
+ hasOldVal,
+ CU.subjectId(tx, cctx.shared()),
+ null,
- inTx() ? tx.resolveTaskName() : null);
++ inTx() ? tx.resolveTaskName() : null,
++ keepBinary);
+
+ if (cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(false);
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- break; // Inner while loop.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to add candidates because entry was removed (will renew).");
+ break; // Inner while loop.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add candidates because entry was removed (will renew).");
+ synchronized (GridNearLockFuture.this) {
// Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ entries.set(i,
+ (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
}
-
- i++;
}
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ i++;
+ }
- onDone(true);
+ try {
+ proceedMapping();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
+
+ onDone(true);
}
}