You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/15 18:22:31 UTC
ignite git commit: Debugging slowdowns
Repository: ignite
Updated Branches:
refs/heads/ignite-1.4-slow-server-debug 4ce4ff198 -> 96c6bebac
Debugging slowdowns
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c6beba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c6beba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c6beba
Branch: refs/heads/ignite-1.4-slow-server-debug
Commit: 96c6bebac5a9f32efcdd448d9f13bc3e9071a367
Parents: 4ce4ff1
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Oct 15 19:21:33 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Oct 15 19:21:33 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/Bench.java | 99 ++++++++++
.../processors/cache/GridCacheMvccManager.java | 111 +++++------
.../distributed/GridDistributedTxMapping.java | 8 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 28 +--
.../distributed/dht/GridDhtTxPrepareFuture.java | 29 +--
.../cache/distributed/dht/GridDhtTxRemote.java | 47 ++---
.../near/GridNearOptimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTxFinishFuture.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 4 +-
.../cache/transactions/IgniteTxManager.java | 183 ++++++++++---------
.../ignite/internal/util/IgniteUuidCache.java | 6 +-
.../util/future/GridCompoundFuture.java | 175 ++++++++++++------
.../java/org/jsr166/ConcurrentHashMap8.java | 2 +-
.../java/org/jsr166/ConcurrentLinkedDeque8.java | 2 +-
.../org/jsr166/ConcurrentLinkedHashMap.java | 2 +-
15 files changed, 442 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Bench.java b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
new file mode 100644
index 0000000..994156e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
@@ -0,0 +1,99 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jsr166.LongAdder8;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class Bench {
+ public static void main(String[] args) throws InterruptedException {
+ Ignition.start(config("1",
+ false));
+ Ignition.start(config("2",
+ false));
+
+ final boolean client = false;
+
+ final Ignite ignite = Ignition.start(config("0",
+ client));
+
+ final IgniteCache<Object, Object> cache =
+ ignite.getOrCreateCache(new CacheConfiguration<>()
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setBackups(1).setRebalanceMode(CacheRebalanceMode.SYNC)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+ Thread.sleep(2000);
+
+
+ final LongAdder8 cnt = new LongAdder8();
+
+ final AtomicLong time = new AtomicLong(U.currentTimeMillis());
+
+ for (int i = 0; i < 3; i++) {
+ new Thread(
+ new Runnable() {
+ @Override public void run() {
+ for (;;) {
+ int key;
+
+ if (client)
+ key = ThreadLocalRandom.current().nextInt(10000);
+
+ else
+ for (;;) {
+ key = ThreadLocalRandom.current().nextInt(10000);
+
+ if (ignite.affinity(null).isPrimary(ignite.cluster().localNode(), key))
+ break;
+ }
+
+ cache.put(key, 0);
+
+ cnt.increment();
+
+ long l = time.get();
+ long now = U.currentTimeMillis();
+
+ if (now - l > 1000 && time.compareAndSet(l, now))
+ System.out.println("TPS [client=" + client + ", cnt=" + cnt.sumThenReset() + ']');
+ }
+ }
+ }
+ ).start();
+ }
+ }
+
+ private static IgniteConfiguration config(
+ String name,
+ boolean client
+ ) {
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ return new IgniteConfiguration().setGridName(name).setLocalHost("127.0.0.1").setClientMode(client).setCommunicationSpi(commSpi);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e2d0302..bc0f634 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,19 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -57,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
@@ -64,6 +52,19 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
/** Finish futures. */
- private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+ //private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
/** Logger. */
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -171,8 +172,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
else if (log.isDebugEnabled())
log.debug("Failed to find transaction for changed owner: " + owner);
- for (FinishLockFuture f : finishFuts)
- f.recheck(entry);
+// for (FinishLockFuture f : finishFuts)
+// f.recheck(entry);
}
/** {@inheritDoc} */
@@ -443,28 +444,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return true;
while (true) {
- Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
- new ConcurrentLinkedDeque8<GridCacheFuture<?>>() {
- /** */
- private int hash;
-
- {
- // Make sure that we add future to queue before
- // adding queue to the map of futures.
- add(fut);
- }
+ // TODO Properly optimize
+ Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
+ {
+ // Make sure that we add future to queue before
+ // adding queue to the map of futures.
+ add(fut);
+ }
- @Override public int hashCode() {
- if (hash == 0)
- hash = System.identityHashCode(this);
+ @Override public int hashCode() {
+ return System.identityHashCode(this);
+ }
- return hash;
- }
+ @Override public boolean equals(Object obj) {
+ return obj == this;
+ }
+ };
- @Override public boolean equals(Object obj) {
- return obj == this;
- }
- });
+ Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
+ col);
if (old != null) {
boolean empty, dup = false;
@@ -477,6 +475,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (!empty && !dup)
old.add(fut);
+
+ if (old.size() > 4)
+ System.out.println("Old: " + old);
}
// Future is being removed, so we force-remove here and try again.
@@ -630,7 +631,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (cacheCtx.isNear() || cacheCtx.isLocal())
return true;
- boolean ret = rmvLocks.add(ver);
+ boolean ret = true;//rmvLocks.add(ver);
if (log.isDebugEnabled())
log.debug("Added removed lock version: " + ver);
@@ -944,7 +945,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + futs.size());
X.println(">>> near2dhtSize: " + near2dht.size());
- X.println(">>> finishFutsSize: " + finishFuts.size());
+// X.println(">>> finishFutsSize: " + finishFuts.size());
}
/**
@@ -964,10 +965,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
- for (FinishLockFuture fut : finishFuts) {
- if (fut.topologyVersion().equals(topVer))
- cands.putAll(fut.pendingLocks());
- }
+// for (FinishLockFuture fut : finishFuts) {
+// if (fut.topologyVersion().equals(topVer))
+// cands.putAll(fut.pendingLocks());
+// }
return cands;
}
@@ -1059,17 +1060,17 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
),
topVer);
- finishFuts.add(finishFut);
-
- finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> e) {
- finishFuts.remove(finishFut);
-
- // This call is required to make sure that the concurrent queue
- // clears memory occupied by internal nodes.
- finishFuts.peek();
- }
- });
+// finishFuts.add(finishFut);
+//
+// finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
+// @Override public void apply(IgniteInternalFuture<?> e) {
+// finishFuts.remove(finishFut);
+//
+// // This call is required to make sure that the concurrent queue
+// // clears memory occupied by internal nodes.
+// finishFuts.peek();
+// }
+// });
finishFut.recheck();
@@ -1083,8 +1084,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking pending locks for completion.");
- for (FinishLockFuture fut : finishFuts)
- fut.recheck();
+// for (FinishLockFuture fut : finishFuts)
+// fut.recheck();
}
/**
@@ -1245,4 +1246,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return S.toString(FinishLockFuture.class, this, super.toString());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 1e78ba2..2d2d935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -23,13 +23,13 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable {
public GridDistributedTxMapping(ClusterNode node) {
this.node = node;
- entries = new GridConcurrentLinkedHashSet<>();
+ entries = new LinkedHashSet<>();
}
/**
@@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable {
*/
private void ensureModifiable() {
if (readOnly) {
- entries = new GridConcurrentLinkedHashSet<>(entries);
+ entries = new LinkedHashSet<>(entries);
readOnly = false;
}
@@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable {
@Override public String toString() {
return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8c7d985..dfeffb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -58,7 +59,6 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -78,10 +78,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** Near mappings. */
- protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> nearMap = new HashMap<>();
/** DHT mappings. */
- protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> dhtMap = new HashMap<>();
/** Mapped flag. */
protected AtomicBoolean mapped = new AtomicBoolean();
@@ -141,20 +141,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
int taskNameHash
) {
super(
- cctx,
- xidVer,
- implicit,
- implicitSingle,
- sys,
- plc,
- concurrency,
- isolation,
- timeout,
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ timeout,
invalidate,
storeEnabled,
onePhaseCommit,
- txSize,
- subjId,
+ txSize,
+ subjId,
taskNameHash
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 165c8a9..89a435a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -17,18 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
@@ -79,6 +67,21 @@ import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
@@ -178,7 +181,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
@GridToStringInclude
- private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ private Set<IgniteTxKey> lockKeys = new HashSet<>();
/** Force keys future for correct transforms. */
private IgniteInternalFuture<?> forceKeysFut;
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f8be2a7..3996d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
@@ -39,11 +40,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
@@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new LinkedHashMap<>(U.capacity(txSize));
topologyVersion(topVer);
}
@@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
this.rmtFutId = rmtFutId;
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new LinkedHashMap<>(U.capacity(txSize));
topologyVersion(topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 25028c4..d11db2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -60,7 +62,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionOptimisticException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -478,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
txMapping = new GridDhtTxMapping();
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+ Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
for (int cacheId : tx.activeCacheIds()) {
@@ -555,7 +556,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
*
* @param mappings Queue of mappings.
*/
- private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+ private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
if (isDone())
return;
@@ -757,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
private AtomicBoolean rcvRes = new AtomicBoolean(false);
/** Mappings to proceed prepare. */
- private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+ private Queue<GridDistributedTxMapping> mappings;
/**
* @param m Mapping.
@@ -765,7 +766,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
*/
MiniFuture(
GridDistributedTxMapping m,
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+ Queue<GridDistributedTxMapping> mappings
) {
this.m = m;
this.mappings = mappings;
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index b62bbea..d058b67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -92,7 +92,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private AtomicReference<Throwable> err = new AtomicReference<>(null);
/** Node mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings;
+ private Map<UUID, GridDistributedTxMapping> mappings;
/** Trackable flag. */
private boolean trackable = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 216f978..17e3ac1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -87,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** DHT mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+ private Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
/** Future. */
@GridToStringExclude
@@ -424,7 +424,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @return DHT map.
*/
- ConcurrentMap<UUID, GridDistributedTxMapping> mappings() {
+ Map<UUID, GridDistributedTxMapping> mappings() {
return mappings;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4074eee..032e043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -851,8 +851,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
GridCacheVersion finishTn = cctx.versions().last();
// Add future to prepare queue only on first prepare call.
- if (tx.markPreparing())
- prepareQ.offer(tx);
+// if (tx.markPreparing())
+// prepareQ.offer(tx);
+
+ tx.markPreparing();
// Check that our read set does not intersect with write set
// of all transactions that completed their write phase
@@ -888,50 +890,50 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Check that our read and write sets do not intersect with write
// sets of all active transactions.
- for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) {
- IgniteInternalTx prepareTx = iter.next();
-
- if (prepareTx == tx)
- // Skip yourself.
- continue;
-
- // Optimistically remove completed transactions.
- if (prepareTx.done()) {
- iter.remove();
-
- if (log.isDebugEnabled())
- log.debug("Removed finished transaction from active queue: " + prepareTx);
-
- continue;
- }
-
- // Check if originating node left.
- if (cctx.discovery().node(prepareTx.nodeId()) == null) {
- iter.remove();
-
- rollbackTx(prepareTx);
-
- if (log.isDebugEnabled())
- log.debug("Removed and rolled back transaction because sender node left grid: " +
- CU.txString(prepareTx));
-
- continue;
- }
-
- if (tx.serializable() && !prepareTx.isRollbackOnly()) {
- Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
-
- if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
- // Remove from active set.
- iter.remove();
-
- tx.setRollbackOnly();
-
- throw new IgniteTxOptimisticCheckedException(
- "Failed to prepare transaction (read-set/write-set conflict): " + tx);
- }
- }
- }
+// for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) {
+// IgniteInternalTx prepareTx = iter.next();
+//
+// if (prepareTx == tx)
+// // Skip yourself.
+// continue;
+//
+// // Optimistically remove completed transactions.
+// if (prepareTx.done()) {
+// iter.remove();
+//
+// if (log.isDebugEnabled())
+// log.debug("Removed finished transaction from active queue: " + prepareTx);
+//
+// continue;
+// }
+//
+// // Check if originating node left.
+// if (cctx.discovery().node(prepareTx.nodeId()) == null) {
+// iter.remove();
+//
+// rollbackTx(prepareTx);
+//
+// if (log.isDebugEnabled())
+// log.debug("Removed and rolled back transaction because sender node left grid: " +
+// CU.txString(prepareTx));
+//
+// continue;
+// }
+//
+// if (tx.serializable() && !prepareTx.isRollbackOnly()) {
+// Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
+//
+// if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
+// // Remove from active set.
+// iter.remove();
+//
+// tx.setRollbackOnly();
+//
+// throw new IgniteTxOptimisticCheckedException(
+// "Failed to prepare transaction (read-set/write-set conflict): " + tx);
+// }
+// }
+// }
}
// Optimistic.
@@ -1097,23 +1099,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return If transaction was not already present in completed set.
*/
public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) {
- if (nearXidVer != null)
- xidVer = new CommittedVersion(xidVer, nearXidVer);
-
- Boolean committed = completedVers.putIfAbsent(xidVer, true);
-
- if (committed == null || committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to committed version set: " + xidVer);
-
- return true;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in rolled back version set: " + xidVer);
-
- return false;
- }
+// if (nearXidVer != null)
+// xidVer = new CommittedVersion(xidVer, nearXidVer);
+//
+// Boolean committed = completedVers.putIfAbsent(xidVer, true);
+//
+// if (committed == null || committed) {
+// if (log.isDebugEnabled())
+// log.debug("Added transaction to committed version set: " + xidVer);
+//
+// return true;
+// }
+// else {
+// if (log.isDebugEnabled())
+// log.debug("Transaction is already present in rolled back version set: " + xidVer);
+//
+// return false;
+// }
+ return true;
}
/**
@@ -1121,20 +1124,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return If transaction was not already present in completed set.
*/
public boolean addRolledbackTx(GridCacheVersion xidVer) {
- Boolean committed = completedVers.putIfAbsent(xidVer, false);
-
- if (committed == null || !committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to rolled back version set: " + xidVer);
- return true;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in committed version set: " + xidVer);
-
- return false;
- }
+ return true;
+// Boolean committed = completedVers.putIfAbsent(xidVer, false);
+//
+// if (committed == null || !committed) {
+// if (log.isDebugEnabled())
+// log.debug("Added transaction to rolled back version set: " + xidVer);
+//
+// return true;
+// }
+// else {
+// if (log.isDebugEnabled())
+// log.debug("Transaction is already present in committed version set: " + xidVer);
+//
+// return false;
+// }
}
/**
@@ -1261,19 +1266,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* so we don't do it here.
*/
- Boolean committed = completedVers.get(tx.xidVersion());
-
- // 1. Make sure that committed version has been recorded.
- if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
- uncommitTx(tx);
-
- GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
- GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
-
- throw new IgniteException("Missing commit version (consider increasing " +
- IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
- first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
- }
+// Boolean committed = completedVers.get(tx.xidVersion());
+//
+// // 1. Make sure that committed version has been recorded.
+// if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
+// uncommitTx(tx);
+//
+// GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
+// GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
+//
+// throw new IgniteException("Missing commit version (consider increasing " +
+// IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
+// first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
+// }
ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
index 4ca00d9..d9ffdd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.util;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
/**
*
*/
@@ -29,7 +31,7 @@ public final class IgniteUuidCache {
/** Cache. */
private static final ConcurrentMap<UUID, UUID> cache =
- new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64);
+ new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q);
/**
* Gets cached UUID to preserve memory.
@@ -56,4 +58,4 @@ public final class IgniteUuidCache {
private IgniteUuidCache() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 0a6d9aa..c795578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,25 +17,23 @@
package org.apache.ignite.internal.util.future;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Future composed of multiple inner futures.
@@ -44,33 +42,44 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** */
private static final long serialVersionUID = 0L;
- /** Futures. */
- private final ConcurrentLinkedDeque8<IgniteInternalFuture<T>> futs = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final int INITED = 0b1;
- /** Pending futures. */
- private final Collection<IgniteInternalFuture<T>> pending = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
- /** Listener call count. */
- private final AtomicInteger lsnrCalls = new AtomicInteger();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
- /** Finished flag. */
- private final AtomicBoolean finished = new AtomicBoolean();
+ /** */
+ private static final AtomicReferenceFieldUpdater<GridCompoundFuture, Throwable> errUpd =
+ AtomicReferenceFieldUpdater.newUpdater(GridCompoundFuture.class, Throwable.class, "err");
+
+ /** Futures. */
+ private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
/** Reducer. */
@GridToStringInclude
private IgniteReducer<T, R> rdc;
- /** Initialize flag. */
- private AtomicBoolean init = new AtomicBoolean(false);
-
- /** Result with a flag to control if reducer has been called. */
- private AtomicMarkableReference<R> res = new AtomicMarkableReference<>(null, false);
-
/** Exceptions to ignore. */
private Class<? extends Throwable>[] ignoreChildFailures;
/** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>();
+ private volatile Throwable err;
+
+ /**
+ * @see #INITED
+ */
+ private volatile int flags;
+
+ /** */
+ private volatile int lsnrCalls;
+
+ /** */
+ private final Object mux = new Object();
/**
*
@@ -104,7 +113,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
if (onCancelled()) {
- for (IgniteInternalFuture<T> fut : futs)
+ for (IgniteInternalFuture<T> fut : futures())
fut.cancel();
return true;
@@ -118,8 +127,25 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @return Collection of futures.
*/
+ private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+ synchronized (mux) {
+ Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
+
+ for (IgniteInternalFuture<T> fut : futs) {
+ if (!pending || !fut.isDone())
+ res.add(fut);
+ }
+
+ return res;
+ }
+ }
+ /**
+ * Gets collection of futures.
+ *
+ * @return Collection of futures.
+ */
public Collection<IgniteInternalFuture<T>> futures() {
- return futs;
+ return futures(false);
}
/**
@@ -128,7 +154,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return Pending futures.
*/
public Collection<IgniteInternalFuture<T>> pending() {
- return pending;
+ return futures(true);
}
/**
@@ -147,7 +173,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return {@code True} if there are pending futures.
*/
public boolean hasPending() {
- return !pending.isEmpty();
+ return !pending().isEmpty();
}
/**
@@ -155,7 +181,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* {@link #markInitialized()} method is called on future.
*/
public boolean initialized() {
- return init.get();
+ return flagSet(INITED);
}
/**
@@ -166,8 +192,9 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
public void add(IgniteInternalFuture<T> fut) {
assert fut != null;
- pending.add(fut);
- futs.add(fut);
+ synchronized (mux) {
+ futs.add(fut);
+ }
fut.listen(new Listener());
@@ -219,10 +246,34 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
/**
+ * @param flag
+ * @return
+ */
+ private boolean casFlag(int flag) {
+ for (;;) {
+ int flags0 = flags;
+
+ if ((flags0 & flag) != 0)
+ return false;
+
+ if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
+ return true;
+ }
+ }
+
+ /**
+ * @param flag
+ * @return
+ */
+ private boolean flagSet(int flag) {
+ return (flags & flag) != 0;
+ }
+
+ /**
* Mark this future as initialized.
*/
public void markInitialized() {
- if (init.compareAndSet(false, true))
+ if (casFlag(INITED))
// Check complete to make sure that we take care
// of all the ignored callbacks.
checkComplete();
@@ -232,32 +283,44 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* Check completeness of the future.
*/
private void checkComplete() {
- Throwable err = this.err.get();
+ Throwable err = this.err;
boolean ignore = ignoreFailure(err);
- if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore))
- && finished.compareAndSet(false, true)) {
+ if (flagSet(INITED) && !isDone() &&
+ ((err != null && !ignore) || lsnrCalls == futuresSize())) {
try {
- if (err == null && rdc != null && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ onDone(
+ rdc != null ? rdc.reduce() : null,
+ ignore ? null : err);
}
catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ U.error(
+ null,
+ "Failed to execute compound future reducer: " + this,
+ e);
onDone(e);
-
- return;
}
catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ U.error(
+ null,
+ "Failed to execute compound future reducer: " + this,
+ e);
onDone(e);
throw e;
}
+ }
+ }
- onDone(res.getReference(), ignore ? null : err);
+ /**
+ * @return
+ */
+ private int futuresSize() {
+ synchronized (mux) {
+ return futs.size();
}
}
@@ -286,13 +349,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
return S.toString(GridCompoundFuture.class, this,
"done", isDone(),
"cancelled", isCancelled(),
- "err", error(),
- "futs",
- F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
- @Override public String apply(IgniteInternalFuture<T> f) {
- return Boolean.toString(f.isDone());
- }
- })
+ "err", error()
+// ,
+//
+// "futs",
+// F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
+// @Override public String apply(IgniteInternalFuture<T> f) {
+// return Boolean.toString(f.isDone());
+// }
+// })
);
}
@@ -305,14 +370,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public void apply(IgniteInternalFuture<T> fut) {
- pending.remove(fut);
-
try {
T t = fut.get();
try {
- if (rdc != null && !rdc.collect(t) && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ if (rdc != null && !rdc.collect(t))
+ onDone(rdc.reduce());
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -331,18 +394,18 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
ClusterTopologyCheckedException e) {
- err.compareAndSet(null, e);
+ errUpd.compareAndSet(GridCompoundFuture.this, null, e);
}
catch (IgniteCheckedException e) {
if (!ignoreFailure(e))
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ errUpd.compareAndSet(GridCompoundFuture.this, null, e);
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ errUpd.compareAndSet(GridCompoundFuture.this, null, e);
}
catch (AssertionError e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -353,7 +416,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
throw e;
}
- lsnrCalls.incrementAndGet();
+ lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
checkComplete();
}
@@ -363,4 +426,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
return "Compound future listener: " + GridCompoundFuture.this;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
index d93f12e..b3747d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
@@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8<K, V>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
index 75db13c..28e38d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
@@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8<E>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
index 5b7381e..22baa46 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
@@ -2163,4 +2163,4 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
*/
PER_SEGMENT_Q_OPTIMIZED_RMV
}
-}
\ No newline at end of file
+}