You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 10:18:22 UTC
[11/50] [abbrv] ignite git commit: debugging slowdowns
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 76f2fbe..6f92204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -57,17 +58,14 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
@@ -109,7 +107,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
private boolean retval;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ private volatile Throwable err;
/** Timed out flag. */
private volatile boolean timedOut;
@@ -129,8 +127,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
private GridNearTxLocal tx;
/** Topology snapshot to operate on. */
- private AtomicReference<AffinityTopologyVersion> topVer =
- new AtomicReference<>();
+ private volatile AffinityTopologyVersion topVer;
/** Map of current values. */
private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap;
@@ -138,9 +135,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/** Trackable flag. */
private boolean trackable = true;
- /** Mutex. */
- private final Object mux = new Object();
-
/** Keys locked so far. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@GridToStringExclude
@@ -152,6 +146,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/** Skip store flag. */
private final boolean skipStore;
+ /** Mappings to proceed. */
+ @GridToStringExclude
+ private Queue<GridNearLockMapping> mappings;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -206,7 +204,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
cctx.time().addTimeoutObject(timeoutObj);
}
- valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
+ valMap = new ConcurrentHashMap8<>();
}
/** {@inheritDoc} */
@@ -217,10 +215,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/**
* @return Entries.
*/
- public List<GridDistributedCacheEntry> entriesCopy() {
- synchronized (mux) {
- return new ArrayList<>(entries);
- }
+ public synchronized List<GridDistributedCacheEntry> entriesCopy() {
+ return new ArrayList<>(entries);
}
/**
@@ -313,6 +309,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
GridNearCacheEntry entry,
UUID dhtNodeId
) throws GridCacheEntryRemovedException {
+ assert Thread.holdsLock(this);
+
// Check if lock acquisition is timed out.
if (timedOut)
return null;
@@ -335,9 +333,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
txEntry.cached(entry);
}
- synchronized (mux) {
- entries.add(entry);
- }
+ entries.add(entry);
if (c == null && timeout < 0) {
if (log.isDebugEnabled())
@@ -525,7 +521,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @param t Error.
*/
private void onError(Throwable t) {
- err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t);
+ synchronized (this) {
+ if (err == null)
+ err = t;
+ }
}
/**
@@ -572,35 +571,39 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*/
private boolean checkLocks() {
if (!isDone() && initialized() && !hasPending()) {
- for (int i = 0; i < entries.size(); i++) {
- while (true) {
- GridCacheEntryEx cached = entries.get(i);
+ synchronized (this) {
+ for (int i = 0; i < entries.size(); i++) {
+ while (true) {
+ GridCacheEntryEx cached = entries.get(i);
- try {
- if (!locked(cached)) {
- if (log.isDebugEnabled())
- log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" +
- cached + ", fut=" + this + ']');
+ try {
+ if (!locked(cached)) {
+ if (log.isDebugEnabled())
+ log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" +
+ cached + ", fut=" + this + ']');
- return false;
- }
+ return false;
+ }
- break;
- }
- // Possible in concurrent cases, when owner is changed after locks
- // have been released or cancelled.
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
+ break;
+ }
+ // Possible in concurrent cases, when owner is changed after locks
+ // have been released or cancelled.
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
- // Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key()));
+ // Replace old entry with new one.
+ entries.set(
+ i,
+ (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key()));
+ }
}
}
- }
- if (log.isDebugEnabled())
- log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
+ if (log.isDebugEnabled())
+ log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
+ }
onComplete(true, true);
@@ -627,7 +630,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (isDone() || (err == null && success && !checkLocks()))
return false;
- this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err);
+ if (err != null && !(err instanceof GridCacheLockTimeoutException))
+ onError(err);
if (err != null)
success = false;
@@ -653,7 +657,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (tx != null)
cctx.tm().txContext(tx);
- if (super.onDone(success, err.get())) {
+ if (super.onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
@@ -730,7 +734,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
// Continue mapping on the same topology version as it was before.
- this.topVer.compareAndSet(null, topVer);
+ if (this.topVer == null)
+ this.topVer = topVer;
map(keys, false, true);
@@ -749,7 +754,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*
* @param remap Remap flag.
*/
- void mapOnTopology(final boolean remap) {
+ synchronized void mapOnTopology(final boolean remap) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -778,13 +783,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (tx != null)
tx.onRemap(topVer);
- this.topVer.set(topVer);
+ this.topVer = topVer;
}
else {
if (tx != null)
tx.topologyVersion(topVer);
- this.topVer.compareAndSet(null, topVer);
+ if (this.topVer == null)
+ this.topVer = topVer;
}
map(keys, remap, false);
@@ -825,7 +831,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*/
private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
- AffinityTopologyVersion topVer = this.topVer.get();
+ AffinityTopologyVersion topVer = this.topVer;
assert topVer != null;
@@ -842,204 +848,227 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
- ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+ synchronized (this) {
+ mappings = new ArrayDeque<>();
- // Assign keys to primary nodes.
- GridNearLockMapping map = null;
+ // Assign keys to primary nodes.
+ GridNearLockMapping map = null;
- for (KeyCacheObject key : keys) {
- GridNearLockMapping updated = map(key, map, topVer);
+ for (KeyCacheObject key : keys) {
+ GridNearLockMapping updated = map(
+ key,
+ map,
+ topVer);
- // If new mapping was created, add to collection.
- if (updated != map) {
- mappings.add(updated);
+ // If new mapping was created, add to collection.
+ if (updated != map) {
+ mappings.add(updated);
- if (tx != null && updated.node().isLocal())
- tx.nearLocallyMapped(true);
+ if (tx != null && updated.node().isLocal())
+ tx.nearLocallyMapped(true);
+ }
+
+ map = updated;
}
- map = updated;
- }
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done: " + this);
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
+ return;
+ }
- return;
- }
+ if (log.isDebugEnabled())
+ log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
- if (log.isDebugEnabled())
- log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+ boolean first = true;
- boolean first = true;
+ // Create mini futures.
+ for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+ GridNearLockMapping mapping = iter.next();
- // Create mini futures.
- for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
- GridNearLockMapping mapping = iter.next();
+ ClusterNode node = mapping.node();
+ Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
- ClusterNode node = mapping.node();
- Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+ assert !mappedKeys.isEmpty();
- assert !mappedKeys.isEmpty();
+ GridNearLockRequest req = null;
- GridNearLockRequest req = null;
+ Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
- Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+ boolean explicit = false;
- boolean explicit = false;
+ for (KeyCacheObject key : mappedKeys) {
+ IgniteTxKey txKey = cctx.txKey(key);
- for (KeyCacheObject key : mappedKeys) {
- IgniteTxKey txKey = cctx.txKey(key);
+ while (true) {
+ GridNearCacheEntry entry = null;
- while (true) {
- GridNearCacheEntry entry = null;
+ try {
+ entry = cctx.near().entryExx(
+ key,
+ topVer);
- try {
- entry = cctx.near().entryExx(key, topVer);
+ if (!cctx.isAll(
+ entry,
+ filter)) {
+ if (log.isDebugEnabled())
+ log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- if (!cctx.isAll(entry, filter)) {
- if (log.isDebugEnabled())
- log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+ onComplete(
+ false,
+ false);
- onComplete(false, false);
+ return;
+ }
- return;
- }
+ // Removed exception may be thrown here.
+ GridCacheMvccCandidate cand = addEntry(
+ topVer,
+ entry,
+ node.id());
- // Removed exception may be thrown here.
- GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id());
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done after addEntry attempt " +
+ "[fut=" + this + ", entry=" + entry + ']');
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done after addEntry attempt " +
- "[fut=" + this + ", entry=" + entry + ']');
+ return;
+ }
- return;
- }
+ if (cand != null) {
+ if (tx == null && !cand.reentry())
+ cctx.mvcc().addExplicitLock(
+ threadId,
+ cand,
+ topVer);
- if (cand != null) {
- if (tx == null && !cand.reentry())
- cctx.mvcc().addExplicitLock(threadId, cand, topVer);
+ IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
- IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
+ if (val == null) {
+ GridDhtCacheEntry dhtEntry = dht().peekExx(key);
- if (val == null) {
- GridDhtCacheEntry dhtEntry = dht().peekExx(key);
+ try {
+ if (dhtEntry != null)
+ val = dhtEntry.versionedValue(topVer);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert dhtEntry.obsolete() : dhtEntry;
- try {
- if (dhtEntry != null)
- val = dhtEntry.versionedValue(topVer);
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception for DHT entry in map (will ignore): "
+ + dhtEntry);
+ }
}
- catch (GridCacheEntryRemovedException ignored) {
- assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: "
- + dhtEntry;
- if (log.isDebugEnabled())
- log.debug("Got removed exception for DHT entry in map (will ignore): "
- + dhtEntry);
- }
- }
+ GridCacheVersion dhtVer = null;
- GridCacheVersion dhtVer = null;
+ if (val != null) {
+ dhtVer = val.get1();
- if (val != null) {
- dhtVer = val.get1();
+ valMap.put(
+ key,
+ val);
+ }
- valMap.put(key, val);
- }
+ if (!cand.reentry()) {
+ if (req == null) {
+ boolean clientFirst = false;
- if (!cand.reentry()) {
- if (req == null) {
- boolean clientFirst = false;
+ if (first) {
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
- if (first) {
- clientFirst = clientNode &&
- !topLocked &&
- (tx == null || !tx.hasRemoteLocks());
+ first = false;
+ }
- first = false;
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
+ retval,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncCommit(),
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
+ clientFirst,
+ cctx.deploymentEnabled());
+
+ mapping.request(req);
}
- req = new GridNearLockRequest(
- cctx.cacheId(),
- topVer,
- cctx.nodeId(),
- threadId,
- futId,
- lockVer,
- inTx(),
- implicitTx(),
- implicitSingleTx(),
- read,
- retval,
- isolation(),
- isInvalidate(),
- timeout,
- mappedKeys.size(),
- inTx() ? tx.size() : mappedKeys.size(),
- inTx() && tx.syncCommit(),
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? accessTtl : -1L,
- skipStore,
- clientFirst,
- cctx.deploymentEnabled());
-
- mapping.request(req);
- }
+ distributedKeys.add(key);
- distributedKeys.add(key);
+ if (tx != null)
+ tx.addKeyMapping(
+ txKey,
+ mapping.node());
- if (tx != null)
- tx.addKeyMapping(txKey, mapping.node());
+ req.addKeyBytes(
+ key,
+ retval && dhtVer == null,
+ dhtVer,
+ // Include DHT version to match remote DHT entry.
+ cctx);
+ }
- req.addKeyBytes(
- key,
- retval && dhtVer == null,
- dhtVer, // Include DHT version to match remote DHT entry.
- cctx);
+ if (cand.reentry())
+ explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
}
-
- if (cand.reentry())
+ else
+ // Ignore reentries within transactions.
explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
- }
- else
- // Ignore reentries within transactions.
- explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
- if (explicit)
- tx.addKeyMapping(txKey, mapping.node());
+ if (explicit)
+ tx.addKeyMapping(
+ txKey,
+ mapping.node());
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
- if (log.isDebugEnabled())
- log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ }
}
- }
- // Mark mapping explicit lock flag.
- if (explicit) {
- boolean marked = tx != null && tx.markExplicit(node.id());
+ // Mark mapping explicit lock flag.
+ if (explicit) {
+ boolean marked = tx != null && tx.markExplicit(node.id());
- assert tx == null || marked;
+ assert tx == null || marked;
+ }
}
- }
- if (!distributedKeys.isEmpty())
- mapping.distributedKeys(distributedKeys);
- else {
- assert mapping.request() == null;
+ if (!distributedKeys.isEmpty())
+ mapping.distributedKeys(distributedKeys);
+ else {
+ assert mapping.request() == null;
- iter.remove();
+ iter.remove();
+ }
}
}
cctx.mvcc().recheckPendingLocks();
- proceedMapping(mappings);
+ proceedMapping();
}
catch (IgniteCheckedException ex) {
onError(ex);
@@ -1050,13 +1079,16 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
* remote primary node.
*
- * @param mappings Queue of mappings.
* @throws IgniteCheckedException If mapping can not be completed.
*/
@SuppressWarnings("unchecked")
- private void proceedMapping(final ConcurrentLinkedDeque8<GridNearLockMapping> mappings)
+ private void proceedMapping()
throws IgniteCheckedException {
- GridNearLockMapping map = mappings.poll();
+ GridNearLockMapping map;
+
+ synchronized (this) {
+ map = mappings.poll();
+ }
// If there are no more mappings to process, complete the future.
if (map == null)
@@ -1139,7 +1171,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
res.rolledbackVersions(), res.pending());
@@ -1181,9 +1213,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
log.debug("Failed to add candidates because entry was " +
"removed (will renew).");
- // Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)
- cctx.cache().entryEx(entry.key()));
+ synchronized (GridNearLockFuture.this) {
+ // Replace old entry with new one.
+ entries.set(i,
+ (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ }
}
}
@@ -1191,7 +1225,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
// Proceed and add new future (if any) before completing embedded future.
- proceedMapping(mappings);
+ proceedMapping();
}
catch (IgniteCheckedException ex) {
onError(ex);
@@ -1205,7 +1239,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
fut));
}
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys);
req.miniId(fut.futureId());
@@ -1302,7 +1336,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
"(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
- topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
return topEx;
}
@@ -1354,23 +1388,19 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
@GridToStringInclude
private Collection<KeyCacheObject> keys;
- /** Mappings to proceed. */
- @GridToStringExclude
- private ConcurrentLinkedDeque8<GridNearLockMapping> mappings;
-
/** */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ private boolean rcvRes;
/**
* @param node Node.
* @param keys Keys.
- * @param mappings Mappings to proceed.
*/
- MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys,
- ConcurrentLinkedDeque8<GridNearLockMapping> mappings) {
+ MiniFuture(
+ ClusterNode node,
+ Collection<KeyCacheObject> keys
+ ) {
this.node = node;
this.keys = keys;
- this.mappings = mappings;
}
/**
@@ -1395,197 +1425,194 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
/**
- * @param e Error.
- */
- void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
- }
- else
- U.warn(log, "Received error after another result has been processed [fut=" + GridNearLockFuture.this +
- ", mini=" + this + ']', e);
- }
-
- /**
* @param e Node left exception.
*/
void onResult(ClusterTopologyCheckedException e) {
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+ synchronized (this) {
+ if (!rcvRes)
+ rcvRes = true;
+ else
+ return;
+ }
- if (tx != null)
- tx.removeMapping(node.id());
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
- // Primary node left the grid, so fail the future.
- GridNearLockFuture.this.onDone(newTopologyException(e, node.id()));
+ if (tx != null)
+ tx.removeMapping(node.id());
- onDone(true);
- }
+ // Primary node left the grid, so fail the future.
+ GridNearLockFuture.this.onDone(newTopologyException(e, node.id()));
+
+ onDone(true);
}
/**
* @param res Result callback.
*/
void onResult(GridNearLockResponse res) {
- if (rcvRes.compareAndSet(false, true)) {
- if (res.error() != null) {
- if (log.isDebugEnabled())
- log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
- ", res=" + res + ']');
+ synchronized (this) {
+ if (!rcvRes)
+ rcvRes = true;
+ else
+ return;
+ }
- // Fail.
- if (res.error() instanceof GridCacheLockTimeoutException)
- onDone(false);
- else
- onDone(res.error());
+ if (res.error() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
+ ", res=" + res + ']');
- return;
- }
+ // Fail.
+ if (res.error() instanceof GridCacheLockTimeoutException)
+ onDone(false);
+ else
+ onDone(res.error());
- if (res.clientRemapVersion() != null) {
- assert cctx.kernalContext().clientNode();
+ return;
+ }
- IgniteInternalFuture<?> affFut =
- cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- if (affFut != null && !affFut.isDone()) {
- affFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- try {
- fut.get();
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- remap();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- finally {
- cctx.shared().txContextReset();
- }
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+
+ remap();
}
- });
- }
- else
- remap();
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ finally {
+ cctx.shared().txContextReset();
+ }
+ }
+ });
}
- else {
- int i = 0;
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+ AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer;
- for (KeyCacheObject k : keys) {
- while (true) {
- GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+ for (KeyCacheObject k : keys) {
+ while (true) {
+ GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ try {
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- return;
- }
+ return;
+ }
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
- CacheObject oldVal = entry.rawGet();
- boolean hasOldVal = false;
- CacheObject newVal = res.value(i);
+ CacheObject oldVal = entry.rawGet();
+ boolean hasOldVal = false;
+ CacheObject newVal = res.value(i);
- boolean readRecordable = false;
+ boolean readRecordable = false;
- if (retval) {
- readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ if (retval) {
+ readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
- if (readRecordable)
- hasOldVal = entry.hasValue();
- }
+ if (readRecordable)
+ hasOldVal = entry.hasValue();
+ }
- GridCacheVersion dhtVer = res.dhtVersion(i);
- GridCacheVersion mappedVer = res.mappedVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion mappedVer = res.mappedVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
- oldVal = oldValTup.get2();
- }
+ oldVal = oldValTup.get2();
}
+ }
- // Lock is held at this point, so we can set the
- // returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+ // Lock is held at this point, so we can set the
+ // returned value if any.
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
- if (inTx()) {
- tx.hasRemoteLocks(true);
+ if (inTx()) {
+ tx.hasRemoteLocks(true);
- if (implicitTx() && tx.onePhaseCommit()) {
- boolean pass = res.filterResult(i);
+ if (implicitTx() && tx.onePhaseCommit()) {
+ boolean pass = res.filterResult(i);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
- }
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
}
+ }
- entry.readyNearLock(lockVer,
- mappedVer,
- res.committedVersions(),
- res.rolledbackVersions(),
- res.pending());
-
- if (retval) {
- if (readRecordable)
- cctx.events().addEvent(
- entry.partition(),
- entry.key(),
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- oldVal,
- hasOldVal,
- CU.subjectId(tx, cctx.shared()),
- null,
- inTx() ? tx.resolveTaskName() : null);
-
- if (cctx.cache().configuration().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(false);
- }
+ entry.readyNearLock(lockVer,
+ mappedVer,
+ res.committedVersions(),
+ res.rolledbackVersions(),
+ res.pending());
+
+ if (retval) {
+ if (readRecordable)
+ cctx.events().addEvent(
+ entry.partition(),
+ entry.key(),
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ oldVal,
+ hasOldVal,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ inTx() ? tx.resolveTaskName() : null);
+
+ if (cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(false);
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- break; // Inner while loop.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to add candidates because entry was removed (will renew).");
+ break; // Inner while loop.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add candidates because entry was removed (will renew).");
+ synchronized (GridNearLockFuture.this) {
// Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ entries.set(i,
+ (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
}
-
- i++;
}
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ i++;
+ }
- onDone(true);
+ try {
+ proceedMapping();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
+
+ onDone(true);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
index b4f689c..6c8e388 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -35,7 +35,7 @@ public class GridNearLockMapping {
/** Collection of mapped keys. */
@GridToStringInclude
- private Collection<KeyCacheObject> mappedKeys = new LinkedList<>();
+ private final Collection<KeyCacheObject> mappedKeys = new ArrayList<>();
/** Near lock request. */
@GridToStringExclude
@@ -115,4 +115,4 @@ public class GridNearLockMapping {
public String toString() {
return S.toString(GridNearLockMapping.class, this);
}
-}
\ No newline at end of file
+}