You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/23 15:02:13 UTC
[02/17] ignite git commit: debugging slowdowns
debugging slowdowns
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e6d0ffe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e6d0ffe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e6d0ffe
Branch: refs/heads/ignite-1282
Commit: 5e6d0ffefb7d58cbc21bea651671d4de02abf622
Parents: 8e7e330
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Nov 20 19:03:40 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Nov 20 19:03:40 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/GridDhtGetFuture.java | 24 +-
.../distributed/dht/GridDhtLockFuture.java | 78 +-
.../colocated/GridDhtColocatedLockFuture.java | 618 ++++++++--------
.../distributed/near/GridNearGetFuture.java | 2 -
.../distributed/near/GridNearLockFuture.java | 719 ++++++++++---------
.../distributed/near/GridNearLockMapping.java | 6 +-
6 files changed, 748 insertions(+), 699 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7108da6..6b696b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@@ -37,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -83,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private LinkedHashMap<KeyCacheObject, Boolean> keys;
/** Reserved partitions. */
- private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5);
+ private Collection<GridDhtLocalPartition> parts = new HashSet<>();
/** Future ID. */
private IgniteUuid futId;
@@ -98,7 +98,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private IgniteTxLocalEx tx;
/** Retries because ownership changed. */
- private Collection<Integer> retries = new GridLeanSet<>();
+ private Collection<Integer> retries;
/** Subject ID. */
private UUID subjId;
@@ -174,7 +174,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
- return retries;
+ return retries == null ? Collections.<Integer>emptyList() : retries;
}
/**
@@ -210,8 +210,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) {
GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
- if (!F.isEmpty(fut.invalidPartitions()))
+ if (!F.isEmpty(fut.invalidPartitions())) {
+ if (retries == null)
+ retries = new HashSet<>();
+
retries.addAll(fut.invalidPartitions());
+ }
add(new GridEmbeddedFuture<>(
new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
@@ -229,9 +233,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
int part = cctx.affinity().partition(key.getKey());
- if (!retries.contains(part)) {
- if (!map(key.getKey(), parts))
+ if (retries == null || !retries.contains(part)) {
+ if (!map(key.getKey(), parts)) {
+ if (retries == null)
+ retries = new HashSet<>();
+
retries.add(part);
+ }
else
mappedKeys.put(key.getKey(), key.getValue());
}
@@ -441,4 +449,4 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private GridDhtCacheAdapter<K, V> cache() {
return (GridDhtCacheAdapter<K, V>)cctx.cache();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a7978c9..543acb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -51,8 +52,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -63,7 +62,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.NotNull;
@@ -123,7 +121,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
private boolean read;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ private Throwable err;
/** Timed out flag. */
private volatile boolean timedOut;
@@ -142,19 +140,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
private GridDhtTxLocalAdapter tx;
/** All replies flag. */
- private AtomicBoolean mapped = new AtomicBoolean(false);
+ private boolean mapped;
/** */
- private Collection<Integer> invalidParts = new GridLeanSet<>();
+ private Collection<Integer> invalidParts;
/** Trackable flag. */
private boolean trackable = true;
- /** Mutex. */
- private final Object mux = new Object();
-
/** Pending locks. */
- private final Collection<KeyCacheObject> pendingLocks = new GridConcurrentHashSet<>();
+ private final Collection<KeyCacheObject> pendingLocks;
/** TTL for read operation. */
private long accessTtl;
@@ -231,6 +226,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
futId = IgniteUuid.randomUuid();
entries = new ArrayList<>(cnt);
+ pendingLocks = U.newHashSet(cnt);
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class);
@@ -244,7 +240,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
- return invalidParts;
+ return invalidParts == null ? Collections.<Integer>emptyList() : invalidParts;
}
/**
@@ -252,6 +248,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param invalidPart Partition to retry.
*/
void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
+ if (invalidParts == null)
+ invalidParts = new HashSet<>();
+
invalidParts.add(invalidPart);
// Register invalid partitions with transaction.
@@ -287,10 +286,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return Entries.
*/
- public Collection<GridDhtCacheEntry> entriesCopy() {
- synchronized (mux) {
- return new ArrayList<>(entries());
- }
+ public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
+ return new ArrayList<>(entries());
}
/**
@@ -403,12 +400,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
return null;
}
- synchronized (mux) {
+ synchronized (this) {
entries.add(c == null || c.reentry() ? null : entry);
- }
- if (c != null && !c.reentry())
- pendingLocks.add(entry.key());
+ if (c != null && !c.reentry())
+ pendingLocks.add(entry.key());
+ }
// Double check if the future has already timed out.
if (timedOut) {
@@ -615,19 +612,17 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
/**
- * @param e Error.
- */
- public void onError(GridDistributedLockCancelledException e) {
- if (err.compareAndSet(null, e))
- onComplete(false);
- }
-
- /**
* @param t Error.
*/
public void onError(Throwable t) {
- if (err.compareAndSet(null, t))
- onComplete(false);
+ synchronized (this) {
+ if (err != null)
+ return;
+
+ err = t;
+ }
+
+ onComplete(false);
}
/**
@@ -667,7 +662,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
if (owner != null && owner.version().equals(lockVer)) {
- pendingLocks.remove(entry.key());
+ synchronized (this) {
+ pendingLocks.remove(entry.key());
+ }
if (checkLocks())
map(entries());
@@ -681,7 +678,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return {@code True} if locks have been acquired.
*/
- private boolean checkLocks() {
+ private synchronized boolean checkLocks() {
return pendingLocks.isEmpty();
}
@@ -713,7 +710,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (isDone() || (err == null && success && !checkLocks()))
return false;
- this.err.compareAndSet(null, err);
+ synchronized (this) {
+ if (this.err == null)
+ this.err = err;
+ }
return onComplete(success);
}
@@ -734,10 +734,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (tx != null)
cctx.tm().txContext(tx);
- if (err.get() == null)
+ if (err == null)
loadMissingFromStore();
- if (super.onDone(success, err.get())) {
+ if (super.onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
@@ -778,11 +778,11 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param entries Entries.
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
- if (!mapped.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Will not map DHT lock future (other thread is mapping): " + this);
+ synchronized (this) {
+ if (mapped)
+ return;
- return;
+ mapped = true;
}
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 8245d88..7e17efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -66,12 +66,10 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
@@ -113,7 +111,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
private boolean retval;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
+ private volatile Throwable err;
/** Timeout object. */
@GridToStringExclude
@@ -130,7 +128,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
private GridNearTxLocal tx;
/** Topology snapshot to operate on. */
- private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>();
+ private volatile AffinityTopologyVersion topVer;
/** Map of current values. */
private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap;
@@ -144,6 +142,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** Skip store flag. */
private final boolean skipStore;
+ /** */
+ private Deque<GridNearLockMapping> mappings;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -196,7 +197,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
cctx.time().addTimeoutObject(timeoutObj);
}
- valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
+ valMap = new ConcurrentHashMap8<>();
}
/** {@inheritDoc} */
@@ -318,7 +319,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
false,
null);
- cand.topologyVersion(topVer.get());
+ cand.topologyVersion(topVer);
}
}
else {
@@ -338,12 +339,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
false,
null);
- cand.topologyVersion(topVer.get());
+ cand.topologyVersion(topVer);
}
else
cand = cand.reenter();
- cctx.mvcc().addExplicitLock(threadId, cand, topVer.get());
+ cctx.mvcc().addExplicitLock(threadId, cand, topVer);
}
return cand;
@@ -479,8 +480,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @param t Error.
*/
- private void onError(Throwable t) {
- err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t);
+ private synchronized void onError(Throwable t) {
+ if (err == null && !(t instanceof GridCacheLockTimeoutException))
+ err = t;
}
/** {@inheritDoc} */
@@ -499,7 +501,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (isDone())
return false;
- this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err);
+ if (err != null)
+ onError(err);
if (err != null)
success = false;
@@ -525,7 +528,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (tx != null)
cctx.tm().txContext(tx);
- if (super.onDone(success, err.get())) {
+ if (super.onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
@@ -617,7 +620,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
// Continue mapping on the same topology version as it was before.
- this.topVer.compareAndSet(null, topVer);
+ synchronized (this) {
+ if (this.topVer == null)
+ this.topVer = topVer;
+ }
map(keys, false, true);
@@ -666,13 +672,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (tx != null)
tx.onRemap(topVer);
- this.topVer.set(topVer);
+ synchronized (this) {
+ this.topVer = topVer;
+ }
}
else {
if (tx != null)
tx.topologyVersion(topVer);
- this.topVer.compareAndSet(null, topVer);
+ synchronized (this) {
+ if (this.topVer == null)
+ this.topVer = topVer;
+ }
}
map(keys, remap, false);
@@ -716,242 +727,256 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
*/
private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
- AffinityTopologyVersion topVer = this.topVer.get();
+ map0(
+ keys,
+ remap,
+ topLocked);
+ }
+ catch (IgniteCheckedException ex) {
+ onDone(false, ex);
+ }
+ }
- assert topVer != null;
+ private synchronized void map0(
+ Collection<KeyCacheObject> keys,
+ boolean remap,
+ boolean topLocked
+ ) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = this.topVer;
- assert topVer.topologyVersion() > 0;
+ assert topVer != null;
- if (CU.affinityNodes(cctx, topVer).isEmpty()) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid): " + cctx.name()));
+ assert topVer.topologyVersion() > 0;
- return;
- }
+ if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid): " + cctx.name()));
- boolean clientNode = cctx.kernalContext().clientNode();
+ return;
+ }
- assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+ boolean clientNode = cctx.kernalContext().clientNode();
- // First assume this node is primary for all keys passed in.
- if (!clientNode && mapAsPrimary(keys, topVer))
- return;
+ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
- Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+ // First assume this node is primary for all keys passed in.
+ if (!clientNode && mapAsPrimary(keys, topVer))
+ return;
- // Assign keys to primary nodes.
- GridNearLockMapping map = null;
+ mappings = new ArrayDeque<>();
- for (KeyCacheObject key : keys) {
- GridNearLockMapping updated = map(key, map, topVer);
+ // Assign keys to primary nodes.
+ GridNearLockMapping map = null;
- // If new mapping was created, add to collection.
- if (updated != map) {
- mappings.add(updated);
+ for (KeyCacheObject key : keys) {
+ GridNearLockMapping updated = map(key, map, topVer);
- if (tx != null && updated.node().isLocal())
- tx.colocatedLocallyMapped(true);
- }
+ // If new mapping was created, add to collection.
+ if (updated != map) {
+ mappings.add(updated);
- map = updated;
+ if (tx != null && updated.node().isLocal())
+ tx.colocatedLocallyMapped(true);
}
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
-
- return;
- }
+ map = updated;
+ }
+ if (isDone()) {
if (log.isDebugEnabled())
- log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+ log.debug("Abandoning (re)map because future is done: " + this);
+
+ return;
+ }
- boolean hasRmtNodes = false;
+ if (log.isDebugEnabled())
+ log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
- boolean first = true;
+ boolean hasRmtNodes = false;
- // Create mini futures.
- for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
- GridNearLockMapping mapping = iter.next();
+ boolean first = true;
- ClusterNode node = mapping.node();
- Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+ // Create mini futures.
+ for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+ GridNearLockMapping mapping = iter.next();
- boolean loc = node.equals(cctx.localNode());
+ ClusterNode node = mapping.node();
+ Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
- assert !mappedKeys.isEmpty();
+ boolean loc = node.equals(cctx.localNode());
- GridNearLockRequest req = null;
+ assert !mappedKeys.isEmpty();
- Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+ GridNearLockRequest req = null;
- for (KeyCacheObject key : mappedKeys) {
- IgniteTxKey txKey = cctx.txKey(key);
+ Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
- GridDistributedCacheEntry entry = null;
+ for (KeyCacheObject key : mappedKeys) {
+ IgniteTxKey txKey = cctx.txKey(key);
- if (tx != null) {
- IgniteTxEntry txEntry = tx.entry(txKey);
+ GridDistributedCacheEntry entry = null;
- if (txEntry != null) {
- entry = (GridDistributedCacheEntry)txEntry.cached();
+ if (tx != null) {
+ IgniteTxEntry txEntry = tx.entry(txKey);
- if (entry != null && !(loc ^ entry.detached())) {
- entry = cctx.colocated().entryExx(key, topVer, true);
+ if (txEntry != null) {
+ entry = (GridDistributedCacheEntry)txEntry.cached();
- txEntry.cached(entry);
- }
+ if (entry != null && !(loc ^ entry.detached())) {
+ entry = cctx.colocated().entryExx(key, topVer, true);
+
+ txEntry.cached(entry);
}
}
+ }
- boolean explicit;
+ boolean explicit;
- while (true) {
- try {
- if (entry == null)
- entry = cctx.colocated().entryExx(key, topVer, true);
+ while (true) {
+ try {
+ if (entry == null)
+ entry = cctx.colocated().entryExx(key, topVer, true);
- if (!cctx.isAll(entry, filter)) {
- if (log.isDebugEnabled())
- log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+ if (!cctx.isAll(entry, filter)) {
+ if (log.isDebugEnabled())
+ log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- onComplete(false, false);
+ onComplete(false, false);
- return;
- }
+ return;
+ }
- assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
+ assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
- GridCacheMvccCandidate cand = addEntry(entry);
+ GridCacheMvccCandidate cand = addEntry(entry);
- // Will either return value from dht cache or null if this is a miss.
- IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
- ((GridDhtCacheEntry)entry).versionedValue(topVer);
+ // Will either return value from dht cache or null if this is a miss.
+ IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
+ ((GridDhtCacheEntry)entry).versionedValue(topVer);
- GridCacheVersion dhtVer = null;
+ GridCacheVersion dhtVer = null;
- if (val != null) {
- dhtVer = val.get1();
+ if (val != null) {
+ dhtVer = val.get1();
- valMap.put(key, val);
- }
+ valMap.put(key, val);
+ }
- if (cand != null && !cand.reentry()) {
- if (req == null) {
- boolean clientFirst = false;
-
- if (first) {
- clientFirst = clientNode &&
- !topLocked &&
- (tx == null || !tx.hasRemoteLocks());
-
- first = false;
- }
-
- req = new GridNearLockRequest(
- cctx.cacheId(),
- topVer,
- cctx.nodeId(),
- threadId,
- futId,
- lockVer,
- inTx(),
- implicitTx(),
- implicitSingleTx(),
- read,
- retval,
- isolation(),
- isInvalidate(),
- timeout,
- mappedKeys.size(),
- inTx() ? tx.size() : mappedKeys.size(),
- inTx() && tx.syncCommit(),
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? accessTtl : -1L,
- skipStore,
- clientFirst,
- cctx.deploymentEnabled());
-
- mapping.request(req);
- }
+ if (cand != null && !cand.reentry()) {
+ if (req == null) {
+ boolean clientFirst = false;
- distributedKeys.add(key);
+ if (first) {
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
- if (tx != null)
- tx.addKeyMapping(txKey, mapping.node());
+ first = false;
+ }
- req.addKeyBytes(
- key,
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
retval,
- dhtVer, // Include DHT version to match remote DHT entry.
- cctx);
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncCommit(),
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
+ clientFirst,
+ cctx.deploymentEnabled());
+
+ mapping.request(req);
}
- explicit = inTx() && cand == null;
+ distributedKeys.add(key);
- if (explicit)
+ if (tx != null)
tx.addKeyMapping(txKey, mapping.node());
- break;
+ req.addKeyBytes(
+ key,
+ retval,
+ dhtVer, // Include DHT version to match remote DHT entry.
+ cctx);
}
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- entry = null;
- }
- }
+ explicit = inTx() && cand == null;
- // Mark mapping explicit lock flag.
- if (explicit) {
- boolean marked = tx != null && tx.markExplicit(node.id());
+ if (explicit)
+ tx.addKeyMapping(txKey, mapping.node());
- assert tx == null || marked;
+ break;
}
- }
-
- if (inTx() && req != null)
- req.hasTransforms(tx.hasTransforms());
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- if (!distributedKeys.isEmpty()) {
- mapping.distributedKeys(distributedKeys);
-
- hasRmtNodes |= !mapping.node().isLocal();
+ entry = null;
+ }
}
- else {
- assert mapping.request() == null;
- iter.remove();
+ // Mark mapping explicit lock flag.
+ if (explicit) {
+ boolean marked = tx != null && tx.markExplicit(node.id());
+
+ assert tx == null || marked;
}
}
- if (hasRmtNodes) {
- trackable = true;
+ if (inTx() && req != null)
+ req.hasTransforms(tx.hasTransforms());
- if (!remap && !cctx.mvcc().addFuture(this))
- throw new IllegalStateException("Duplicate future ID: " + this);
+ if (!distributedKeys.isEmpty()) {
+ mapping.distributedKeys(distributedKeys);
+
+ hasRmtNodes |= !mapping.node().isLocal();
}
- else
- trackable = false;
+ else {
+ assert mapping.request() == null;
- proceedMapping(mappings);
+ iter.remove();
+ }
}
- catch (IgniteCheckedException ex) {
- onDone(false, ex);
+
+ if (hasRmtNodes) {
+ trackable = true;
+
+ if (!remap && !cctx.mvcc().addFuture(this))
+ throw new IllegalStateException("Duplicate future ID: " + this);
}
+ else
+ trackable = false;
+
+ proceedMapping();
}
/**
* Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
* remote primary node.
*
- * @param mappings Queue of mappings.
* @throws IgniteCheckedException If mapping can not be completed.
*/
- private void proceedMapping(final Deque<GridNearLockMapping> mappings)
+ private void proceedMapping()
throws IgniteCheckedException {
- GridNearLockMapping map = mappings.poll();
+ GridNearLockMapping map;
+
+ synchronized (this) {
+ map = mappings.poll();
+ }
// If there are no more mappings to process, complete the future.
if (map == null)
@@ -965,9 +990,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
req.filter(filter, cctx);
if (node.isLocal())
- lockLocally(mappedKeys, req.topologyVersion(), mappings);
+ lockLocally(mappedKeys, req.topologyVersion());
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys);
req.miniId(fut.futureId());
@@ -1016,15 +1041,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* Locks given keys directly through dht cache.
- *
- * @param keys Collection of keys.
+ * @param keys Collection of keys.
* @param topVer Topology version to lock on.
- * @param mappings Optional collection of mappings to proceed locking.
*/
private void lockLocally(
final Collection<KeyCacheObject> keys,
- AffinityTopologyVersion topVer,
- @Nullable final Deque<GridNearLockMapping> mappings
+ AffinityTopologyVersion topVer
) {
if (log.isDebugEnabled())
log.debug("Before locally locking keys : " + keys);
@@ -1078,7 +1100,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
try {
// Proceed and add new future (if any) before completing embedded future.
if (mappings != null)
- proceedMapping(mappings);
+ proceedMapping();
}
catch (IgniteCheckedException ex) {
onError(ex);
@@ -1101,7 +1123,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed.
* @throws IgniteCheckedException If key cannot be added to mapping.
*/
- private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
// Assign keys to primary nodes.
Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size());
@@ -1137,7 +1160,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
tx.addKeyMapping(cctx.txKey(key), cctx.localNode());
}
- lockLocally(distributedKeys, topVer, null);
+ lockLocally(distributedKeys, topVer);
}
return true;
@@ -1221,7 +1244,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
"(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
- topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
return topEx;
}
@@ -1275,19 +1298,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
private Deque<GridNearLockMapping> mappings;
/** */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ private boolean rcvRes;
/**
* @param node Node.
* @param keys Keys.
- * @param mappings Mappings to proceed.
*/
- MiniFuture(ClusterNode node,
- Collection<KeyCacheObject> keys,
- Deque<GridNearLockMapping> mappings) {
+ MiniFuture(
+ ClusterNode node,
+ Collection<KeyCacheObject> keys
+ ) {
this.node = node;
this.keys = keys;
- this.mappings = mappings;
}
/**
@@ -1312,159 +1334,153 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
/**
- * @param e Error.
- */
- void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
- }
- else
- U.warn(log, "Received error after another result has been processed [fut=" +
- GridDhtColocatedLockFuture.this + ", mini=" + this + ']', e);
- }
-
- /**
* @param e Node left exception.
*/
void onResult(ClusterTopologyCheckedException e) {
if (isDone())
return;
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+ synchronized (this) {
+ if (rcvRes)
+ return;
+
+ rcvRes = true;
+ }
- if (tx != null)
- tx.removeMapping(node.id());
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
- // Primary node left the grid, so fail the future.
- GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id()));
+ if (tx != null)
+ tx.removeMapping(node.id());
- onDone(true);
- }
+ // Primary node left the grid, so fail the future.
+ GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id()));
+
+ onDone(true);
}
/**
* @param res Result callback.
*/
void onResult(GridNearLockResponse res) {
- if (rcvRes.compareAndSet(false, true)) {
- if (res.error() != null) {
- if (log.isDebugEnabled())
- log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
- ", res=" + res + ']');
-
- // Fail.
- if (res.error() instanceof GridCacheLockTimeoutException)
- onDone(false);
- else
- onDone(res.error());
-
+ synchronized (this) {
+ if (rcvRes)
return;
- }
- if (res.clientRemapVersion() != null) {
- assert cctx.kernalContext().clientNode();
+ rcvRes = true;
+ }
- IgniteInternalFuture<?> affFut =
- cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+ if (res.error() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
+ ", res=" + res + ']');
- if (affFut != null && !affFut.isDone()) {
- affFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- try {
- fut.get();
+ // Fail.
+ if (res.error() instanceof GridCacheLockTimeoutException)
+ onDone(false);
+ else
+ onDone(res.error());
- remap();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- finally {
- cctx.shared().txContextReset();
- }
- }
- });
- }
- else
- remap();
- }
- else {
- int i = 0;
+ return;
+ }
- for (KeyCacheObject k : keys) {
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- CacheObject newVal = res.value(i);
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- GridCacheVersion dhtVer = res.dhtVersion(i);
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ remap();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ finally {
+ cctx.shared().txContextReset();
}
}
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- if (inTx()) {
- IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+ for (KeyCacheObject k : keys) {
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
- // In colocated cache we must receive responses only for detached entries.
- assert txEntry.cached().detached() : txEntry;
+ CacheObject newVal = res.value(i);
- txEntry.markLocked();
+ GridCacheVersion dhtVer = res.dhtVersion(i);
- GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+ }
+ }
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ if (inTx()) {
+ IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
- return;
- }
+ // In colocated cache we must receive responses only for detached entries.
+ assert txEntry.cached().detached() : txEntry;
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
+ txEntry.markLocked();
- tx.hasRemoteLocks(true);
+ GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- }
- else
- cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);
-
- if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- cctx.events().addEvent(cctx.affinity().partition(k),
- k,
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- null,
- false,
- CU.subjectId(tx, cctx.shared()),
- null,
- tx == null ? null : tx.resolveTaskName());
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
+
+ return;
}
- i++;
- }
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
- try {
- proceedMapping(mappings);
+ tx.hasRemoteLocks(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
}
- catch (IgniteCheckedException e) {
- onDone(e);
+ else
+ cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);
+
+ if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ cctx.events().addEvent(cctx.affinity().partition(k),
+ k,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ null,
+ false,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ tx == null ? null : tx.resolveTaskName());
}
- onDone(true);
+ i++;
+ }
+
+ try {
+ proceedMapping();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+
+ onDone(true);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index dfaa44e..4a030b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,7 +59,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;