You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/09 10:16:31 UTC
[1/3] ignite git commit: Performance optimizations.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 739322747 -> a4848a702
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/util/GridConcurrentLinkedHashMapMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridConcurrentLinkedHashMapMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridConcurrentLinkedHashMapMultiThreadedSelfTest.java
index 5c6cfcf..9fe2690 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridConcurrentLinkedHashMapMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridConcurrentLinkedHashMapMultiThreadedSelfTest.java
@@ -146,40 +146,41 @@ public class GridConcurrentLinkedHashMapMultiThreadedSelfTest extends GridCommon
public void testEvictPerSegment() throws Exception {
info(">>> Test grid concurrent linked hash map...");
- final int maxSize = 1000;
+ int concurLvl = 64;
+ final int maxSize = concurLvl * 30;
+ int diff = (int)(maxSize * 0.1);
ConcurrentLinkedHashMap<Integer, String> linkedMap = new ConcurrentLinkedHashMap<>(
- 32, 0.75f, 64, maxSize, PER_SEGMENT_Q);
+ 32, 0.75f, concurLvl, maxSize, PER_SEGMENT_Q);
int keyCnt = 1000000;
- putMultiThreaded(linkedMap, 10, keyCnt, maxSize);
-
- int diff = 10; // 1% of 1000.
+ Map<String, LinkedList<Integer>> map = putMultiThreaded(
+ linkedMap,
+ 10,
+ keyCnt,
+ maxSize * 10); // Intentionally memorize more than maxSize since in this mode LRU is not fair.
assertTrue("Invalid map size: " + linkedMap.size(), U.safeAbs(maxSize - linkedMap.size()) <= diff);
assertTrue("Invalid map sizex: " + linkedMap.sizex(), U.safeAbs(maxSize - linkedMap.sizex()) <= diff);
-// TODO IGNITE-606 - Need to fix iterators for ConcurrentLinkedHashMap in perSegment mode
-// LinkedList<Integer> keys = new LinkedList<Integer>(linkedMap.keySet());
-//
-// while (!keys.isEmpty()) {
-// boolean found = false;
-//
-// int key = keys.removeLast();
-//
-// for (LinkedList<Integer> threadKeys : map.values()) {
-// if (threadKeys.getLast() == key) {
-// threadKeys.removeLast();
-//
-// found = true;
-//
-// break;
-// }
-// }
-//
-// assertTrue("Key was not found on the top of any thread: " + key, found);
-// }
+ LinkedList<Integer> keys = new LinkedList<>(linkedMap.keySet());
+
+ while (!keys.isEmpty()) {
+ boolean found = false;
+
+ int key = keys.removeLast();
+
+ for (LinkedList<Integer> threadKeys : map.values()) {
+ if (threadKeys.contains(key)) {
+ found = true;
+
+ break;
+ }
+ }
+
+ assertTrue("Key was not found in any thread: " + key, found);
+ }
int min = Integer.MAX_VALUE;
int max = 0;
@@ -207,40 +208,41 @@ public class GridConcurrentLinkedHashMapMultiThreadedSelfTest extends GridCommon
public void testEvictPerSegmentOptimizedRemoves() throws Exception {
info(">>> Test grid concurrent linked hash map...");
- final int maxSize = 1000;
+ int concurLvl = 64;
+ final int maxSize = concurLvl * 30;
+ int diff = (int)(maxSize * 0.1);
ConcurrentLinkedHashMap<Integer, String> linkedMap = new ConcurrentLinkedHashMap<>(
- 32, 0.75f, 64, maxSize, PER_SEGMENT_Q_OPTIMIZED_RMV);
+ 32, 0.75f, concurLvl, maxSize, PER_SEGMENT_Q_OPTIMIZED_RMV);
int keyCnt = 1000000;
- putMultiThreaded(linkedMap, 10, keyCnt, maxSize);
-
- int diff = 10; // 1% of 1000.
+ Map<String, LinkedList<Integer>> map = putMultiThreaded(
+ linkedMap,
+ 10,
+ keyCnt,
+ maxSize * 10); // Intentionally memorize more than maxSize since in this mode LRU is not fair.
assertTrue("Invalid map size: " + linkedMap.size(), U.safeAbs(maxSize - linkedMap.size()) <= diff);
assertTrue("Invalid map sizex: " + linkedMap.sizex(), U.safeAbs(maxSize - linkedMap.sizex()) <= diff);
-// TODO IGNITE-606 - Need to fix iterators for ConcurrentLinkedHashMap in perSegment mode
-// LinkedList<Integer> keys = new LinkedList<Integer>(linkedMap.keySet());
-//
-// while (!keys.isEmpty()) {
-// boolean found = false;
-//
-// int key = keys.removeLast();
-//
-// for (LinkedList<Integer> threadKeys : map.values()) {
-// if (threadKeys.getLast() == key) {
-// threadKeys.removeLast();
-//
-// found = true;
-//
-// break;
-// }
-// }
-//
-// assertTrue("Key was not found on the top of any thread: " + key, found);
-// }
+ LinkedList<Integer> keys = new LinkedList<>(linkedMap.keySet());
+
+ while (!keys.isEmpty()) {
+ boolean found = false;
+
+ int key = keys.removeLast();
+
+ for (LinkedList<Integer> threadKeys : map.values()) {
+ if (threadKeys.contains(key)) {
+ found = true;
+
+ break;
+ }
+ }
+
+ assertTrue("Key was not found in any thread: " + key, found);
+ }
int min = Integer.MAX_VALUE;
int max = 0;
@@ -558,4 +560,4 @@ public class GridConcurrentLinkedHashMapMultiThreadedSelfTest extends GridCommon
info(">>> put get remove test complete [duration = " + (System.currentTimeMillis() - start) + ']');
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
index ead3a63..9c3389f 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
@@ -19,13 +19,23 @@ package org.apache.ignite.yardstick.cache;
import java.util.Map;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
/**
* Ignite benchmark that performs transactional put operations.
*/
public class IgnitePutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+ }
+
+ /** {@inheritDoc} */
@Override public boolean test(Map<Object, Object> ctx) throws Exception {
int key = nextRandom(args.range());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxPrimaryOnlyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxPrimaryOnlyBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxPrimaryOnlyBenchmark.java
new file mode 100644
index 0000000..21275eb
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxPrimaryOnlyBenchmark.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs transactional put operations skipping key if local node is backup.
+ */
+public class IgnitePutTxPrimaryOnlyBenchmark extends IgniteCacheAbstractBenchmark {
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ int key;
+
+ Affinity<Object> aff = ignite().affinity("tx");
+ ClusterNode locNode = ignite().cluster().localNode();
+
+ for (;;) {
+ key = nextRandom(args.range());
+
+ // Exit only if primary.
+ if (aff.isPrimary(locNode, key))
+ break;
+ }
+
+ // Implicit transaction is used.
+ cache.put(key, new SampleValue(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("tx");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxSkipLocalBackupBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxSkipLocalBackupBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxSkipLocalBackupBenchmark.java
new file mode 100644
index 0000000..63934e6
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxSkipLocalBackupBenchmark.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs transactional put operations skipping key if local node is backup.
+ */
+public class IgnitePutTxSkipLocalBackupBenchmark extends IgniteCacheAbstractBenchmark {
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ int key;
+
+ Affinity<Object> aff = ignite().affinity("tx");
+ ClusterNode locNode = ignite().cluster().localNode();
+
+ for (;;) {
+ key = nextRandom(args.range());
+
+ // Skip key if local node is backup.
+ if (!aff.isBackup(locNode, key))
+ break;
+ }
+
+ // Implicit transaction is used.
+ cache.put(key, new SampleValue(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("tx");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
new file mode 100644
index 0000000..83c50bd
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public class WaitMapExchangeFinishCallable implements IgniteCallable<Void> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ Collection<IgniteInternalCache<?, ?>> cachesx = ((IgniteKernal)ignite).cachesx(null);
+
+ for (IgniteInternalCache<?, ?> cache : cachesx) {
+ try {
+ GridDhtPartitionTopology top = cache.context().isNear() ? cache.context().near().dht().topology() :
+ cache.context().dht().topology();
+
+ BenchmarkUtils.println("Validating cache: " + cache.name());
+
+ for (;;) {
+ boolean success = true;
+
+ if (top.topologyVersion().topologyVersion() == ignite.cluster().topologyVersion()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(true).entrySet()) {
+ for (Map.Entry<Integer, GridDhtPartitionState> p : e.getValue().entrySet()) {
+ if (p.getValue() != GridDhtPartitionState.OWNING) {
+ BenchmarkUtils.println("Not owning partition [part=" + p.getKey() +
+ ", state=" + p.getValue() + ']');
+
+ success = false;
+
+ break;
+ }
+ }
+
+ if (!success)
+ break;
+ }
+ }
+ else {
+ BenchmarkUtils.println("Topology version is different [cache=" + top.topologyVersion() +
+ ", cluster=" + ignite.cluster().topologyVersion() + ']');
+
+ success = false;
+ }
+
+ if (!success)
+ Thread.sleep(1000);
+ else {
+ BenchmarkUtils.println("Cache state is fine: " + cache.name());
+
+ break;
+ }
+ }
+ }
+ catch (RuntimeException e1) {
+ BenchmarkUtils.println("Ignored exception: " + e1);
+ }
+ }
+
+ return null;
+ }
+}
[3/3] ignite git commit: Performance optimizations.
Posted by sb...@apache.org.
Performance optimizations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4848a70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4848a70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4848a70
Branch: refs/heads/ignite-1.5
Commit: a4848a702fea1573af7f36af91d02f7df3ab64f4
Parents: 7393227
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 9 12:16:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 9 12:16:16 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoMessage.java | 4 +-
.../processors/cache/GridCacheContext.java | 29 +--
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 55 ++--
.../processors/cache/GridCacheMvccManager.java | 145 +++++------
.../distributed/GridDistributedCacheEntry.java | 2 +-
.../distributed/GridDistributedTxMapping.java | 8 +-
.../GridDistributedTxRemoteAdapter.java | 5 +-
.../distributed/dht/GridDhtLockFuture.java | 7 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 13 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 43 ++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 78 +++---
.../cache/distributed/dht/GridDhtTxRemote.java | 45 ++--
.../dht/atomic/GridDhtAtomicCache.java | 1 -
.../dht/colocated/GridDhtColocatedCache.java | 7 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTransactionalCache.java | 7 +-
.../near/GridNearTxFinishFuture.java | 157 ++++++------
.../cache/distributed/near/GridNearTxLocal.java | 21 +-
.../cache/transactions/IgniteInternalTx.java | 3 +-
.../cache/transactions/IgniteTxAdapter.java | 251 ++++++++++---------
.../cache/transactions/IgniteTxHandler.java | 37 +--
.../transactions/IgniteTxLocalAdapter.java | 26 +-
.../cache/transactions/IgniteTxManager.java | 171 ++++++-------
.../GridBoundedConcurrentLinkedHashMap.java | 7 +-
.../GridBoundedConcurrentLinkedHashSet.java | 7 +-
.../util/GridBoundedConcurrentOrderedMap.java | 39 +--
.../internal/util/GridConcurrentFactory.java | 11 +-
.../util/GridConcurrentLinkedHashSet.java | 9 +-
.../ignite/internal/util/IgniteUuidCache.java | 6 +-
.../util/future/GridCompoundFuture.java | 155 ++++++++----
.../java/org/jsr166/ConcurrentHashMap8.java | 2 +-
.../java/org/jsr166/ConcurrentLinkedDeque8.java | 2 +-
.../org/jsr166/ConcurrentLinkedHashMap.java | 195 +++++++++++---
.../GridCacheAffinityBackupsSelfTest.java | 8 +
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
.../GridCacheMissingCommitVersionSelfTest.java | 40 +--
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 2 +-
.../continuous/GridEventConsumeSelfTest.java | 2 +-
...dBoundedConcurrentLinkedHashMapSelfTest.java | 2 +-
.../GridConcurrentLinkedHashMapSelfTest.java | 62 ++++-
.../junits/common/GridCommonAbstractTest.java | 4 +-
...rrentLinkedHashMapMultiThreadedSelfTest.java | 104 ++++----
.../yardstick/cache/IgnitePutTxBenchmark.java | 10 +
.../cache/IgnitePutTxPrimaryOnlyBenchmark.java | 65 +++++
.../IgnitePutTxSkipLocalBackupBenchmark.java | 65 +++++
.../cache/WaitMapExchangeFinishCallable.java | 95 +++++++
48 files changed, 1220 insertions(+), 807 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index c83a281..cb19ba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -143,7 +143,7 @@ public class GridIoMessage implements Message {
/**
* @return Message.
*/
- public Object message() {
+ public Message message() {
return msg;
}
@@ -320,4 +320,4 @@ public class GridIoMessage implements Message {
@Override public String toString() {
return S.toString(GridIoMessage.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f4852c..ee4da46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1488,10 +1488,9 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param log Log.
* @param dhtMap Dht mappings.
* @param nearMap Near mappings.
- * @return {@code True} if mapped.
* @throws GridCacheEntryRemovedException If reader for entry is removed.
*/
- public boolean dhtMap(
+ public void dhtMap(
UUID nearNodeId,
AffinityTopologyVersion topVer,
GridDhtCacheEntry entry,
@@ -1509,7 +1508,7 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node.
- boolean ret = map(entry, dhtRemoteNodes, dhtMap);
+ map(entry, dhtRemoteNodes, dhtMap);
Collection<ClusterNode> nearRemoteNodes = null;
@@ -1530,7 +1529,7 @@ public class GridCacheContext<K, V> implements Externalizable {
if (nearNodes != null && !nearNodes.isEmpty()) {
nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes));
- ret |= map(entry, nearRemoteNodes, nearMap);
+ map(entry, nearRemoteNodes, nearMap);
}
}
@@ -1540,8 +1539,6 @@ public class GridCacheContext<K, V> implements Externalizable {
entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
}
-
- return ret;
}
/**
@@ -1549,10 +1546,9 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param log Log.
* @param dhtMap Dht mappings.
* @param nearMap Near mappings.
- * @return {@code True} if mapped.
* @throws GridCacheEntryRemovedException If reader for entry is removed.
*/
- public boolean dhtMap(
+ public void dhtMap(
GridDhtCacheEntry entry,
GridCacheVersion explicitLockVer,
IgniteLogger log,
@@ -1571,27 +1567,20 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> nearNodes = cand.mappedNearNodes();
- boolean ret = map(entry, dhtNodes, dhtMap);
+ map(entry, dhtNodes, dhtMap);
if (nearNodes != null && !nearNodes.isEmpty())
- ret |= map(entry, nearNodes, nearMap);
-
- return ret;
+ map(entry, nearNodes, nearMap);
}
-
- return false;
}
/**
* @param entry Entry.
* @param nodes Nodes.
* @param map Map.
- * @return {@code True} if mapped.
*/
- private boolean map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
+ private void map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
Map<ClusterNode, List<GridDhtCacheEntry>> map) {
- boolean ret = false;
-
if (nodes != null) {
for (ClusterNode n : nodes) {
List<GridDhtCacheEntry> entries = map.get(n);
@@ -1600,12 +1589,8 @@ public class GridCacheContext<K, V> implements Externalizable {
map.put(n, entries = new LinkedList<>());
entries.add(entry);
-
- ret = true;
}
}
-
- return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 50b01c8..af62e39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -389,7 +389,6 @@ public interface GridCacheEntryEx {
* @param tx Cache transaction.
* @param evtNodeId ID of node responsible for this change.
* @param affNodeId Partitioned node iD.
- * @param writeThrough If {@code true}, persist to the storage.
* @param retval {@code True} if value should be returned (and unmarshalled if needed).
* @param evt Flag to signal event notification.
* @param metrics Flag to signal metrics notification.
@@ -409,7 +408,6 @@ public interface GridCacheEntryEx {
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -1014,4 +1012,4 @@ public interface GridCacheEntryEx {
* Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
*/
public void onUnlock();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ca0995a..df9f5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1108,7 +1108,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheLazyEntry e = new CacheLazyEntry(cctx, key, old);
- Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(cctx, key, old),
+ Object interceptorVal = cctx.config().getInterceptor().onBeforePut(
+ new CacheLazyEntry(cctx, key, old),
val0);
key0 = e.key();
@@ -1212,7 +1213,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -1244,6 +1244,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Cache.Entry entry0 = null;
+ boolean deferred;
+
+ boolean marked = false;
+
synchronized (this) {
checkObsolete();
@@ -1349,40 +1353,33 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
cctx.dataStructures().onEntryUpdated(key, true);
- }
-
- // Persist outside of synchronization. The correctness of the
- // value will be handled by current transaction.
- if (writeThrough)
- cctx.store().remove(tx, keyValue(false));
- if (cctx.deferredDelete() && !detached() && !isInternal())
- cctx.onDeferredDelete(this, newVer);
- else {
- boolean marked = false;
+ deferred = cctx.deferredDelete() && !detached() && !isInternal();
- synchronized (this) {
+ if (!deferred) {
// If entry is still removed.
- if (newVer == ver) {
- if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
- if (log.isDebugEnabled())
- log.debug("Entry could not be marked obsolete (it is still used): " + this);
- }
- else {
- recordNodeId(affNodeId, topVer);
+ assert newVer == ver;
- // If entry was not marked obsolete, then removed lock
- // will be registered whenever removeLock is called.
- cctx.mvcc().addRemoved(cctx, obsoleteVer);
+ if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
+ if (log.isDebugEnabled())
+ log.debug("Entry could not be marked obsolete (it is still used): " + this);
+ }
+ else {
+ recordNodeId(affNodeId, topVer);
- if (log.isDebugEnabled())
- log.debug("Entry was marked obsolete: " + this);
- }
+ if (log.isDebugEnabled())
+ log.debug("Entry was marked obsolete: " + this);
}
}
+ }
- if (marked)
- onMarkedObsolete();
+ if (deferred)
+ cctx.onDeferredDelete(this, newVer);
+
+ if (marked) {
+ assert !deferred;
+
+ onMarkedObsolete();
}
if (intercept)
@@ -4247,4 +4244,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return "IteratorEntry [key=" + key + ']';
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0960c9d..2c14209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -24,7 +25,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
@@ -88,7 +89,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
/** Set of removed lock versions. */
- private Collection<GridCacheVersion> rmvLocks =
+ private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
/** Current local candidates. */
@@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
/** Finish futures. */
- private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+ private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
/** Logger. */
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -143,17 +144,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
Collection<? extends GridCacheFuture> futCol = futs.get(owner.version());
if (futCol != null) {
- for (GridCacheFuture fut : futCol) {
- if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
- GridCacheMvccFuture<Boolean> mvccFut =
- (GridCacheMvccFuture<Boolean>)fut;
-
- // Since this method is called outside of entry synchronization,
- // we can safely invoke any method on the future.
- // Also note that we don't remove future here if it is done.
- // The removal is initiated from within future itself.
- if (mvccFut.onOwnerChanged(entry, owner))
- return;
+ synchronized (futCol) {
+ for (GridCacheFuture fut : futCol) {
+ if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
+ GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
+
+ // Since this method is called outside of entry synchronization,
+ // we can safely invoke any method on the future.
+ // Also note that we don't remove future here if it is done.
+ // The removal is initiated from within future itself.
+ if (mvccFut.onOwnerChanged(entry, owner))
+ return;
+ }
}
}
}
@@ -171,8 +173,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
else if (log.isDebugEnabled())
log.debug("Failed to find transaction for changed owner: " + owner);
- for (FinishLockFuture f : finishFuts)
- f.recheck(entry);
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture f : finishFuts)
+ f.recheck(entry);
+ }
}
/** {@inheritDoc} */
@@ -203,21 +207,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
- for (Collection<GridCacheFuture<?>> futsCol : futs.values()) {
- for (GridCacheFuture<?> fut : futsCol) {
- if (!fut.trackable()) {
- if (log.isDebugEnabled())
- log.debug("Skipping non-trackable future: " + fut);
-
- continue;
- }
-
- fut.onNodeLeft(discoEvt.eventNode().id());
-
- if (fut.isCancelled() || fut.isDone())
- removeFuture(fut);
- }
- }
+ for (GridCacheFuture<?> fut : activeFutures())
+ fut.onNodeLeft(discoEvt.eventNode().id());
for (IgniteInternalFuture<?> fut : atomicFuts.values()) {
if (fut instanceof GridCacheFuture) {
@@ -272,7 +263,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Collection of active futures.
*/
public Collection<GridCacheFuture<?>> activeFutures() {
- return F.flatCollections(futs.values());
+ ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
+
+ for (Collection<GridCacheFuture<?>> verFuts : futs.values()) {
+ synchronized (verFuts) {
+ col.addAll(verFuts);
+ }
+ }
+
+ return col;
}
/**
@@ -345,10 +344,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param err Error.
*/
private void cancelClientFutures(IgniteCheckedException err) {
- for (Collection<GridCacheFuture<?>> futures : futs.values()) {
- for (GridCacheFuture<?> future : futures)
- ((GridFutureAdapter)future).onDone(err);
- }
+ for (GridCacheFuture<?> fut : activeFutures())
+ ((GridFutureAdapter)fut).onDone(err);
for (GridCacheAtomicFuture<?> future : atomicFuts.values())
((GridFutureAdapter)future).onDone(err);
@@ -444,11 +441,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return true;
while (true) {
- Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
- new ConcurrentLinkedDeque8<GridCacheFuture<?>>() {
- /** */
- private int hash;
+ Collection<GridCacheFuture<?>> old = futs.get(fut.version());
+ if (old == null) {
+ Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
{
// Make sure that we add future to queue before
// adding queue to the map of futures.
@@ -456,16 +452,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
@Override public int hashCode() {
- if (hash == 0)
- hash = System.identityHashCode(this);
-
- return hash;
+ return System.identityHashCode(this);
}
@Override public boolean equals(Object obj) {
return obj == this;
}
- });
+ };
+
+ old = futs.putIfAbsent(fut.version(), col);
+ }
if (old != null) {
boolean empty, dup = false;
@@ -474,10 +470,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
empty = old.isEmpty();
if (!empty)
- dup = old.contains(fut);
-
- if (!empty && !dup)
- old.add(fut);
+ dup = !old.add(fut);
}
// Future is being removed, so we force-remove here and try again.
@@ -594,14 +587,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
- if (futs != null)
- for (GridCacheFuture<?> fut : futs)
- if (fut.futureId().equals(futId)) {
- if (log.isDebugEnabled())
- log.debug("Found future in futures map: " + fut);
+ if (futs != null) {
+ synchronized (futs) {
+ for (GridCacheFuture<?> fut : futs) {
+ if (fut.futureId().equals(futId)) {
+ if (log.isDebugEnabled())
+ log.debug("Found future in futures map: " + fut);
- return fut;
+ return fut;
+ }
}
+ }
+ }
if (log.isDebugEnabled())
log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']');
@@ -619,7 +616,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
Collection c = futs.get(ver);
- return c == null ? Collections.<IgniteInternalFuture<T>>emptyList() : (Collection<IgniteInternalFuture<T>>)c;
+ if (c == null)
+ return Collections.<IgniteInternalFuture<T>>emptyList();
+ else {
+ synchronized (c) {
+ return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c);
+ }
+ }
}
/**
@@ -949,12 +952,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
- X.println(">>> rmvLocksSize: " + rmvLocks.size());
+ X.println(">>> rmvLocksSize: " + rmvLocks.sizex());
X.println(">>> dhtLocCandsSize: " + dhtLocCands.size());
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + futs.size());
X.println(">>> near2dhtSize: " + near2dht.size());
- X.println(">>> finishFutsSize: " + finishFuts.size());
+ X.println(">>> finishFutsSize: " + finishFuts.sizex());
}
/**
@@ -974,9 +977,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
- for (FinishLockFuture fut : finishFuts) {
- if (fut.topologyVersion().equals(topVer))
- cands.putAll(fut.pendingLocks());
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture fut : finishFuts) {
+ if (fut.topologyVersion().equals(topVer))
+ cands.putAll(fut.pendingLocks());
+ }
}
return cands;
@@ -1054,8 +1059,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version.
* @return Future that signals when all locks for given partitions will be released.
*/
- private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
- AffinityTopologyVersion topVer) {
+ private IgniteInternalFuture<?> finishLocks(
+ @Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
+ AffinityTopologyVersion topVer
+ ) {
assert topVer.topologyVersion() != 0;
if (topVer.equals(AffinityTopologyVersion.NONE))
@@ -1069,10 +1076,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
finishFuts.remove(finishFut);
-
- // This call is required to make sure that the concurrent queue
- // clears memory occupied by internal nodes.
- finishFuts.peek();
}
});
@@ -1088,8 +1091,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking pending locks for completion.");
- for (FinishLockFuture fut : finishFuts)
- fut.recheck();
+ if (!finishFuts.isEmptyx()) {
+ for (FinishLockFuture fut : finishFuts)
+ fut.recheck();
+ }
}
/**
@@ -1250,4 +1255,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return S.toString(FinishLockFuture.class, this, super.toString());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index a138d30..a3eb723 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -403,7 +403,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
doomed = mvcc == null ? null : mvcc.candidate(ver);
- if (doomed == null || doomed.dhtLocal() || (!doomed.local() && !doomed.nearLocal()))
+ if (doomed == null)
addRemoved(ver);
GridCacheVersion obsoleteVer = obsoleteVersionExtras();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 1e78ba2..2d2d935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -23,13 +23,13 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable {
public GridDistributedTxMapping(ClusterNode node) {
this.node = node;
- entries = new GridConcurrentLinkedHashSet<>();
+ entries = new LinkedHashSet<>();
}
/**
@@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable {
*/
private void ensureModifiable() {
if (readOnly) {
- entries = new GridConcurrentLinkedHashSet<>(entries);
+ entries = new LinkedHashSet<>(entries);
readOnly = false;
}
@@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable {
@Override public String toString() {
return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index fcbf58d..93303c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -583,13 +583,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
eventNodeId(),
nodeId,
false,
- false,
true,
true,
topVer,
null,
replicate ? DR_BACKUP : DR_NONE,
- near() ? null : explicitVer, CU.subjectId(this, cctx),
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer);
else {
@@ -629,7 +629,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
eventNodeId(),
nodeId,
false,
- false,
true,
true,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c175b0b..579d701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -782,14 +782,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Mapping entry for DHT lock future: " + this);
- boolean hasRmtNodes = false;
-
// Assign keys to primary nodes.
for (GridDhtCacheEntry entry : entries) {
try {
while (true) {
try {
- hasRmtNodes = cctx.dhtMap(
+ cctx.dhtMap(
nearNodeId,
topVer,
entry,
@@ -823,9 +821,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
}
- if (tx != null)
- tx.needsCompletedVersions(hasRmtNodes);
-
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Mapping won't proceed because future is done: " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4ce4759..3069afd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -1090,8 +1090,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// We have to add completed versions for cases when nearLocal and remote transactions
// execute concurrently.
- res.completedVersions(ctx.tm().committedVersions(req.version()),
- ctx.tm().rolledbackVersions(req.version()));
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version());
+
+ res.completedVersions(versPair.get1(), versPair.get2());
int i = 0;
@@ -1510,8 +1511,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
}
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
// Backups.
for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8c7d985..6de8795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -84,7 +83,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- protected AtomicBoolean mapped = new AtomicBoolean();
+ protected volatile boolean mapped;
/** */
private long dhtThreadId;
@@ -92,9 +91,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** */
protected boolean explicitLock;
- /** */
- private boolean needsCompletedVers;
-
/** Versions of pending locks for entries of this tx. */
private Collection<GridCacheVersion> pendingVers;
@@ -141,20 +137,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
int taskNameHash
) {
super(
- cctx,
- xidVer,
- implicit,
- implicitSingle,
- sys,
- plc,
- concurrency,
- isolation,
- timeout,
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ timeout,
invalidate,
storeEnabled,
onePhaseCommit,
- txSize,
- subjId,
+ txSize,
+ subjId,
taskNameHash
);
@@ -244,16 +240,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*/
protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err);
- /**
- * @param needsCompletedVers {@code True} if needs completed versions.
- */
- public void needsCompletedVersions(boolean needsCompletedVers) {
- this.needsCompletedVers |= needsCompletedVers;
- }
-
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
- return needsCompletedVers;
+ return nearOnOriginatingNode;
}
/**
@@ -281,10 +270,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
* Map explicit locks.
*/
protected void mapExplicitLocks() {
- if (!mapped.get()) {
+ if (!mapped) {
// Explicit locks may participate in implicit transactions only.
if (!implicit()) {
- mapped.set(true);
+ mapped = true;
return;
}
@@ -343,7 +332,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
if (!F.isEmpty(nearEntryMap))
addNearNodeEntryMapping(nearEntryMap);
- mapped.set(true);
+ mapped = true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 61975d7..1d6f633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,10 +60,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -177,7 +179,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
@GridToStringInclude
- private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ private final Set<IgniteTxKey> lockKeys = new HashSet<>();
/** Force keys future for correct transforms. */
private IgniteInternalFuture<?> forceKeysFut;
@@ -267,7 +269,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry);
- boolean rmv = lockKeys.remove(entry.txKey());
+ boolean rmv;
+
+ synchronized (lockKeys) {
+ rmv = lockKeys.remove(entry.txKey());
+ }
return rmv && mapIfLocked();
}
@@ -293,7 +299,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return {@code True} if all locks are owned.
*/
private boolean checkLocks() {
- return locksReady && lockKeys.isEmpty();
+ if (!locksReady)
+ return false;
+
+ synchronized (lockKeys) {
+ return lockKeys.isEmpty();
+ }
}
/** {@inheritDoc} */
@@ -495,8 +506,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
txEntry.cached(entry);
}
- if (tx.optimistic() && txEntry.explicitVersion() == null)
- lockKeys.add(txEntry.txKey());
+ if (tx.optimistic() && txEntry.explicitVersion() == null) {
+ synchronized (lockKeys) {
+ lockKeys.add(txEntry.txKey());
+ }
+ }
while (true) {
try {
@@ -689,7 +703,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheVersion min = tx.minVersion();
- res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));
+ if (tx.needsCompletedVersions()) {
+ IgnitePair<Collection<GridCacheVersion>> versPair = cctx.tm().versions(min);
+
+ res.completedVersions(versPair.get1(), versPair.get2());
+ }
res.pending(localDhtPendingVersions(tx.writeEntries(), min));
@@ -987,21 +1005,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (err0 != null) {
err.compareAndSet(null, err0);
+ tx.rollbackAsync();
+
final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
- tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) {
- try {
- if (replied.compareAndSet(false, true))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response for transaction: " + tx, e);
- }
- }
- }
- });
+ onDone(res, res.error());
return;
}
@@ -1017,20 +1025,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
- boolean hasRemoteNodes = false;
-
// Assign keys to primary nodes.
if (!F.isEmpty(writes)) {
for (IgniteTxEntry write : writes)
- hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(write.txKey()), futDhtMap, futNearMap);
}
if (!F.isEmpty(reads)) {
for (IgniteTxEntry read : reads)
- hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+ map(tx.entry(read.txKey()), futDhtMap, futNearMap);
}
-
- tx.needsCompletedVersions(hasRemoteNodes);
}
if (isDone())
@@ -1223,15 +1227,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param entry Transaction entry.
* @param futDhtMap DHT mapping.
* @param futNearMap Near mapping.
- * @return {@code True} if mapped.
*/
- private boolean map(
+ private void map(
IgniteTxEntry entry,
Map<UUID, GridDistributedTxMapping> futDhtMap,
Map<UUID, GridDistributedTxMapping> futNearMap
) {
if (entry.cached().isLocal())
- return false;
+ return;
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
@@ -1247,8 +1250,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
}
- boolean ret;
-
while (true) {
try {
Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
@@ -1272,10 +1273,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
log.debug("Entry has no near readers: " + entry);
// Exclude local node.
- ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
+ map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
// Exclude DHT nodes.
- ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
+ map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
break;
}
@@ -1285,8 +1286,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
entry.cached(cached);
}
}
-
- return ret;
}
/**
@@ -1294,16 +1293,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param nodes Nodes.
* @param globalMap Map.
* @param locMap Exclude map.
- * @return {@code True} if mapped.
*/
- private boolean map(
+ private void map(
IgniteTxEntry entry,
Iterable<ClusterNode> nodes,
Map<UUID, GridDistributedTxMapping> globalMap,
Map<UUID, GridDistributedTxMapping> locMap
) {
- boolean ret = false;
-
if (nodes != null) {
for (ClusterNode n : nodes) {
GridDistributedTxMapping global = globalMap.get(n.id());
@@ -1332,12 +1328,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
locMap.put(n.id(), loc = new GridDistributedTxMapping(n));
loc.add(entry);
-
- ret = true;
}
}
-
- return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f8be2a7..e268a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
topologyVersion(topVer);
}
@@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
int taskNameHash
) {
super(
- ctx,
- nodeId,
- rmtThreadId,
- xidVer,
- commitVer,
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
- timeout,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
txSize,
- subjId,
+ subjId,
taskNameHash
);
@@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
this.rmtFutId = rmtFutId;
readMap = Collections.emptyMap();
- writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+ writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
topologyVersion(topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4cd9e84..7f9edb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f03b461..83c220d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
@@ -688,8 +689,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (map == null || map.isEmpty())
return;
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index af43113..0002180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +57,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -293,7 +294,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
txMapping = new GridDhtTxMapping();
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+ Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
if (!F.isEmpty(writes)) {
for (int cacheId : tx.activeCacheIds()) {
@@ -353,7 +354,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*
* @param mappings Queue of mappings.
*/
- private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+ private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
if (isDone())
return;
@@ -556,7 +557,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private AtomicBoolean rcvRes = new AtomicBoolean(false);
/** Mappings to proceed prepare. */
- private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+ private Queue<GridDistributedTxMapping> mappings;
/**
* @param m Mapping.
@@ -564,7 +565,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
MiniFuture(
GridDistributedTxMapping m,
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+ Queue<GridDistributedTxMapping> mappings
) {
this.m = m;
this.mappings = mappings;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 0e8aa0d..5ab85b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -712,8 +713,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (map == null || map.isEmpty())
return;
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+ IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+ Collection<GridCacheVersion> committed = versPair.get1();
+ Collection<GridCacheVersion> rolledback = versPair.get2();
for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 46c9f3e..a9dbda2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -88,15 +87,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** Commit flag. */
private boolean commit;
- /** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
/** Node mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings;
+ private Map<UUID, GridDistributedTxMapping> mappings;
/** Trackable flag. */
private boolean trackable = true;
+ /** */
+ private boolean finishOnePhaseCalled;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -176,38 +175,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- * @param e Error.
- */
- void onError(Throwable e) {
- tx.commitError(e);
-
- if (err.compareAndSet(null, e)) {
- boolean marked = tx.setRollbackOnly();
-
- if (e instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
- else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
- try {
- tx.close();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to invalidate transaction: " + tx, ex);
- }
- }
-
- onComplete();
- }
- }
-
- /**
* @param nodeId Sender.
* @param res Result.
*/
@@ -247,24 +214,56 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
- if ((initialized() || err != null)) {
- if (tx.needCheckBackup()) {
- assert tx.onePhaseCommit();
+ if (isDone())
+ return false;
- if (err != null)
- err = new TransactionRollbackException("Failed to commit transaction.", err);
+ synchronized (this) {
+ if (isDone())
+ return false;
- try {
- tx.finish(err == null);
+ if (err != null) {
+ tx.commitError(err);
+
+ boolean marked = tx.setRollbackOnly();
+
+ if (err instanceof IgniteTxRollbackCheckedException) {
+ if (marked) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
+ }
+ }
}
- catch (IgniteCheckedException e) {
- if (err != null)
- err.addSuppressed(e);
- else
- err = e;
+ else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
+ try {
+ tx.close();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to invalidate transaction: " + tx, ex);
+ }
}
}
+ if (initialized() || err != null) {
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ if (err != null)
+ err = new TransactionRollbackException("Failed to commit transaction.", err);
+
+ try {
+ tx.finish(err == null);
+ }
+ catch (IgniteCheckedException e) {
+ if (err != null)
+ err.addSuppressed(e);
+ else
+ err = e;
+ }
+ }
+
if (tx.onePhaseCommit()) {
boolean commit = this.commit && err == null;
@@ -273,36 +272,35 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.tmFinish(commit);
}
- Throwable th = this.err.get();
-
- if (super.onDone(tx0, th != null ? th : err)) {
- if (error() instanceof IgniteTxHeuristicCheckedException) {
- AffinityTopologyVersion topVer = tx.topologyVersion();
+ if (super.onDone(tx0, err)) {
+ if (error() instanceof IgniteTxHeuristicCheckedException) {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- for (IgniteTxEntry e : tx.writeMap().values()) {
- GridCacheContext cacheCtx = e.context();
+ for (IgniteTxEntry e : tx.writeMap().values()) {
+ GridCacheContext cacheCtx = e.context();
- try {
- if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
- GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
+ try {
+ if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
+ GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
- if (entry != null)
- entry.invalidate(null, tx.xidVersion());
+ if (entry != null)
+ entry.invalidate(null, tx.xidVersion());
+ }
}
- }
- catch (Throwable t) {
- U.error(log, "Failed to invalidate entry.", t);
+ catch (Throwable t) {
+ U.error(log, "Failed to invalidate entry.", t);
- if (t instanceof Error)
- throw (Error)t;
+ if (t instanceof Error)
+ throw (Error)t;
+ }
}
}
- }
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
- return true;
+ return true;
+ }
}
}
@@ -321,7 +319,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* Completeness callback.
*/
private void onComplete() {
- onDone(tx, err.get());
+ onDone(tx);
}
/**
@@ -354,7 +352,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
markInitialized();
- if (!isSync()) {
+ if (!isSync() && !isDone()) {
boolean complete = true;
for (IgniteInternalFuture<?> f : pending())
@@ -367,15 +365,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
+ onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
}
catch (Error | RuntimeException e) {
- onError(e);
+ onDone(e);
throw e;
}
catch (IgniteCheckedException e) {
- onError(e);
+ onDone(e);
}
}
@@ -415,7 +413,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
"(backup has left grid): " + tx.xidVersion(), cause));
}
else if (backup.isLocal()) {
- boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
+ boolean committed = !cctx.tm().addRolledbackTx(tx);
readyNearMappingFromBackup(mapping);
@@ -515,6 +513,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
private void finishOnePhase(boolean commit) {
+ assert Thread.holdsLock(this);
+
+ if (finishOnePhaseCalled)
+ return;
+
+ finishOnePhaseCalled = true;
+
// No need to send messages as transaction was already committed on remote node.
// Finish local mapping only as we need send commit message to backups.
for (GridDistributedTxMapping m : mappings.values()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 883c285..db4a4b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
@@ -88,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** DHT mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+ private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
/** Future. */
@GridToStringExclude
@@ -217,7 +216,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> addReader(
- long msgId,
+ long msgId,
GridDhtCacheEntry cached,
IgniteTxEntry entry,
AffinityTopologyVersion topVer
@@ -472,7 +471,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @return DHT map.
*/
- ConcurrentMap<UUID, GridDistributedTxMapping> mappings() {
+ Map<UUID, GridDistributedTxMapping> mappings() {
return mappings;
}
@@ -798,14 +797,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
catch (Error | RuntimeException e) {
commitErr.compareAndSet(null, e);
- fut0.onError(e);
+ fut0.onDone(e);
throw e;
}
catch (IgniteCheckedException e) {
commitErr.compareAndSet(null, e);
- fut0.onError(e);
+ fut0.onDone(e);
}
}
});
@@ -1152,8 +1151,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override protected GridCacheEntryEx entryEx(
- GridCacheContext cacheCtx,
- IgniteTxKey key,
+ GridCacheContext cacheCtx,
+ IgniteTxKey key,
AffinityTopologyVersion topVer
) {
if (cacheCtx.isColocated()) {
@@ -1245,7 +1244,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
@Override public void onRemap(AffinityTopologyVersion topVer) {
assert cctx.kernalContext().clientNode();
- mapped.set(false);
+ mapped = false;
nearLocallyMapped = false;
colocatedLocallyMapped = false;
txNodes = null;
@@ -1254,7 +1253,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
dhtMap.clear();
mappings.clear();
- this.topVer.set(topVer);
+ synchronized (this) {
+ this.topVer = topVer;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 20fb8c2..94af6bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -536,9 +536,8 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @param commitVer Commit version.
- * @return {@code True} if version was set.
*/
- public boolean commitVersion(GridCacheVersion commitVer);
+ public void commitVersion(GridCacheVersion commitVer);
/**
* @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
[2/3] ignite git commit: Performance optimizations.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c82636..eb2ca2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -24,16 +24,14 @@ import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -47,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -59,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -199,20 +195,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
protected boolean transform;
/** Commit version. */
- private AtomicReference<GridCacheVersion> commitVer = new AtomicReference<>(null);
-
- /** Done marker. */
- protected final AtomicBoolean isDone = new AtomicBoolean(false);
+ private volatile GridCacheVersion commitVer;
/** */
private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE);
- /** Preparing flag. */
- private AtomicBoolean preparing = new AtomicBoolean();
+ /** Done marker. */
+ protected volatile boolean isDone;
+
+ /** Preparing flag (no need for volatile modifier). */
+ private boolean preparing;
/** */
@GridToStringInclude
- private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
+ private Map<Integer, Set<Integer>> invalidParts;
/**
* Transaction state. Note that state is not protected, as we want to
@@ -230,17 +226,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** */
@GridToStringExclude
- private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
+ private volatile GridFutureAdapter<IgniteInternalTx> finFut;
/** Topology version. */
@GridToStringInclude
- protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
-
- /** Mutex. */
- private final Lock lock = new ReentrantLock();
-
- /** Lock condition. */
- private final Condition cond = lock.newCondition();
+ protected volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
protected Map<UUID, Collection<UUID>> txNodes;
@@ -387,37 +377,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/**
- * Acquires lock.
- */
- @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
- protected final void lock() {
- lock.lock();
- }
-
- /**
- * Releases lock.
- */
- protected final void unlock() {
- lock.unlock();
- }
-
- /**
- * Signals all waiters.
- */
- protected final void signalAll() {
- cond.signalAll();
- }
-
- /**
- * Waits for signal.
- *
- * @throws InterruptedException If interrupted.
- */
- protected final void awaitSignal() throws InterruptedException {
- cond.await();
- }
-
- /**
* Checks whether near cache should be updated.
*
* @return Flag indicating whether near cache should be updated.
@@ -548,7 +507,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
- AffinityTopologyVersion res = topVer.get();
+ AffinityTopologyVersion res = topVer;
if (res.equals(AffinityTopologyVersion.NONE))
return cctx.exchange().topologyVersion();
@@ -558,16 +517,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersionSnapshot() {
- AffinityTopologyVersion ret = topVer.get();
+ AffinityTopologyVersion ret = topVer;
return AffinityTopologyVersion.NONE.equals(ret) ? null : ret;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer.compareAndSet(AffinityTopologyVersion.NONE, topVer);
+ AffinityTopologyVersion topVer0 = this.topVer;
- return this.topVer.get();
+ if (!AffinityTopologyVersion.NONE.equals(topVer0))
+ return topVer0;
+
+ synchronized (this) {
+ topVer0 = this.topVer;
+
+ if (AffinityTopologyVersion.NONE.equals(topVer0)) {
+ this.topVer = topVer;
+
+ return topVer;
+ }
+
+ return topVer0;
+ }
}
/** {@inheritDoc} */
@@ -582,7 +554,14 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean markPreparing() {
- return preparing.compareAndSet(false, true);
+ synchronized (this) {
+ if (preparing)
+ return false;
+
+ preparing = true;
+
+ return true;
+ }
}
/**
@@ -730,15 +709,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public Map<Integer, Set<Integer>> invalidPartitions() {
- return invalidParts;
+ return invalidParts == null ? Collections.<Integer, Set<Integer>>emptyMap() : invalidParts;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
+ if (invalidParts == null)
+ invalidParts = new HashMap<>();
+
Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
if (parts == null) {
- parts = new GridLeanSet<>();
+ parts = new HashSet<>();
invalidParts.put(cacheCtx.cacheId(), parts);
}
@@ -879,32 +861,71 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean done() {
- return isDone.get();
+ return isDone;
}
/**
- * @return Commit version.
+ * @return {@code True} if done flag has been set by this call.
*/
- @Override public GridCacheVersion commitVersion() {
- initCommitVersion();
+ private boolean setDone() {
+ boolean isDone0 = isDone;
+
+ if (isDone0)
+ return false;
+
+ synchronized (this) {
+ isDone0 = isDone;
- return commitVer.get();
+ if (isDone0)
+ return false;
+
+ isDone = true;
+
+ return true;
+ }
}
/**
- * @param commitVer Commit version.
- * @return {@code True} if set to not null value.
+ * @return Commit version.
*/
- @Override public boolean commitVersion(GridCacheVersion commitVer) {
- return commitVer != null && this.commitVer.compareAndSet(null, commitVer);
+ @Override public GridCacheVersion commitVersion() {
+ GridCacheVersion commitVer0 = commitVer;
+
+ if (commitVer0 != null)
+ return commitVer0;
+
+ synchronized (this) {
+ commitVer0 = commitVer;
+
+ if (commitVer0 != null)
+ return commitVer0;
+
+ commitVer = commitVer0 = xidVer;
+
+ return commitVer0;
+ }
}
/**
- *
+ * @param commitVer Commit version.
*/
- public void initCommitVersion() {
- if (commitVer.get() == null)
- commitVer.compareAndSet(null, xidVer);
+ @Override public void commitVersion(GridCacheVersion commitVer) {
+ if (commitVer == null)
+ return;
+
+ GridCacheVersion commitVer0 = this.commitVer;
+
+ if (commitVer0 != null)
+ return;
+
+ synchronized (this) {
+ commitVer0 = this.commitVer;
+
+ if (commitVer0 != null)
+ return;
+
+ this.commitVer = commitVer;
+ }
}
/**
@@ -916,7 +937,19 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
rollback();
- awaitCompletion();
+ synchronized (this) {
+ try {
+ while (!done())
+ wait();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ if (!done())
+ throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
+ this, e);
+ }
+ }
}
/** {@inheritDoc} */
@@ -930,29 +963,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/* No-op. */
}
- /**
- * Awaits transaction completion.
- *
- * @throws IgniteCheckedException If waiting failed.
- */
- protected void awaitCompletion() throws IgniteCheckedException {
- lock();
-
- try {
- while (!done())
- awaitSignal();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- if (!done())
- throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e);
- }
- finally {
- unlock();
- }
- }
-
/** {@inheritDoc} */
@Override public boolean internal() {
return internal;
@@ -1019,22 +1029,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
@Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() {
- GridFutureAdapter<IgniteInternalTx> fut = finFut.get();
+ GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut == null) {
- fut = new GridFutureAdapter<IgniteInternalTx>() {
- @Override public String toString() {
- return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this);
- }
- };
+ synchronized (this) {
+ fut = finFut;
- if (!finFut.compareAndSet(null, fut))
- fut = finFut.get();
+ if (fut == null) {
+ fut = new GridFutureAdapter<IgniteInternalTx>() {
+ @Override public String toString() {
+ return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this);
+ }
+ };
+
+ finFut = fut;
+ }
+ }
}
assert fut != null;
- if (isDone.get())
+ if (isDone)
fut.onDone(this);
return fut;
@@ -1059,9 +1074,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
boolean notify = false;
- lock();
-
- try {
+ synchronized (this) {
prev = this.state;
switch (state) {
@@ -1087,7 +1100,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case UNKNOWN: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == ROLLING_BACK || prev == COMMITTING;
@@ -1096,7 +1109,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case COMMITTED: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == COMMITTING;
@@ -1105,7 +1118,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case ROLLED_BACK: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == ROLLING_BACK;
@@ -1135,8 +1148,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (log.isDebugEnabled())
log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
- // Notify of state change.
- signalAll();
+ notifyAll();
}
else {
if (log.isDebugEnabled())
@@ -1144,12 +1156,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
", tx=" + this + ']');
}
}
- finally {
- unlock();
- }
if (notify) {
- GridFutureAdapter<IgniteInternalTx> fut = finFut.get();
+ GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut != null)
fut.onDone(this);
@@ -2026,8 +2035,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public boolean commitVersion(GridCacheVersion commitVer) {
- return false;
+ @Override public void commitVersion(GridCacheVersion commitVer) {
+ // No-op.
}
/** {@inheritDoc} */
@@ -2037,7 +2046,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public void prepare() throws IgniteCheckedException {
-
+ // No-op.
}
/** {@inheritDoc} */
@@ -2047,7 +2056,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public void endVersion(GridCacheVersion endVer) {
-
+ // No-op.
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d9786a8..570aa48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -876,7 +876,7 @@ public class IgniteTxHandler {
log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
if (req.checkCommitted()) {
- sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version()));
+ sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
return;
}
@@ -896,8 +896,11 @@ public class IgniteTxHandler {
if (req.replyRequired()) {
IgniteInternalFuture completeFut;
- IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture();
- IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture();
+ IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+ null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+ IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+ null : nearTx.done() ? null : nearTx.finishFuture();
if (dhtFin != null && nearFin != null) {
GridCompoundFuture fut = new GridCompoundFuture();
@@ -914,8 +917,7 @@ public class IgniteTxHandler {
if (completeFut != null) {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override
- public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
sendReply(nodeId, req, true);
}
});
@@ -928,24 +930,6 @@ public class IgniteTxHandler {
}
/**
- * Checks whether DHT remote transaction with given version has been committed. If not, will add version
- * to rollback version set so that late response will not falsely commit this transaction.
- *
- * @param writeVer Write version to check.
- * @return {@code True} if transaction has been committed, {@code false} otherwise.
- */
- public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) {
- assert writeVer != null;
-
- boolean committed = true;
-
- if (ctx.tm().addRolledbackTx(writeVer))
- committed = false;
-
- return committed;
- }
-
- /**
* @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
@@ -953,7 +937,8 @@ public class IgniteTxHandler {
protected void finish(
UUID nodeId,
IgniteTxRemoteEx tx,
- GridDhtTxFinishRequest req) {
+ GridDhtTxFinishRequest req
+ ) {
// We don't allow explicit locks for transactions and
// therefore immediately return if transaction is null.
// However, we may decide to relax this restriction in
@@ -961,9 +946,9 @@ public class IgniteTxHandler {
if (tx == null) {
if (req.commit())
// Must be some long time duplicate, but we add it anyway.
- ctx.tm().addCommittedTx(req.version(), null);
+ ctx.tm().addCommittedTx(tx, req.version(), null);
else
- ctx.tm().addRolledbackTx(req.version());
+ ctx.tm().addRolledbackTx(tx, req.version());
if (log.isDebugEnabled())
log.debug("Received finish request for non-existing transaction (added to completed set) " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 82e5f2a..2c7bf8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -206,21 +206,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
int taskNameHash
) {
super(
- cctx,
- xidVer,
- implicit,
- implicitSingle,
- /*local*/true,
- sys,
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ /*local*/true,
+ sys,
plc,
- concurrency,
- isolation,
+ concurrency,
+ isolation,
timeout,
invalidate,
- storeEnabled,
- onePhaseCommit,
- txSize,
- subjId,
+ storeEnabled,
+ onePhaseCommit,
+ txSize,
+ subjId,
taskNameHash
);
@@ -1054,7 +1054,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
eventNodeId(),
txEntry.nodeId(),
false,
- false,
evt,
metrics,
topVer,
@@ -1072,7 +1071,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
nodeId,
false,
false,
- false,
metrics,
topVer,
CU.empty0(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 1f51b8a..c2e7dea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -18,17 +18,16 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -62,6 +61,7 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -75,6 +75,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
@@ -95,6 +96,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
* Cache transaction manager.
@@ -128,8 +130,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private IgniteTxHandler txHandler;
/** Committed local transactions. */
- private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers =
- new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
+ private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
+ new GridBoundedConcurrentOrderedMap<>(
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
+
+ /** Committed local transactions. */
+ private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+ new ConcurrentLinkedHashMap<>(
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
+ 0.75f,
+ Runtime.getRuntime().availableProcessors() * 2,
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
+ PER_SEGMENT_Q);
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -298,7 +310,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> threadMapSize: " + threadMap.size());
X.println(">>> idMap [size=" + idMap.size() + ']');
- X.println(">>> completedVersSize: " + completedVers.size());
+ X.println(">>> completedVersSortedSize: " + completedVersSorted.size());
+ X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex());
}
/**
@@ -319,7 +332,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Committed versions size.
*/
public int completedVersionsSize() {
- return completedVers.size();
+ return completedVersHashMap.size();
}
/**
@@ -329,7 +342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* {@code false} otherwise.
*/
public boolean isCompleted(IgniteInternalTx tx) {
- return completedVers.containsKey(tx.xidVersion());
+ return completedVersHashMap.containsKey(tx.xidVersion());
}
/**
@@ -770,65 +783,59 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param map Collection to copy.
- * @param expVal Values to copy.
- * @return Copy of the collection.
+ * @param min Minimum version.
+ * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty,
+ * but also never {@code null}.
*/
- private Collection<GridCacheVersion> copyOf(Map<GridCacheVersion, Boolean> map, boolean expVal) {
- Collection<GridCacheVersion> l = new LinkedList<>();
+ public IgnitePair<Collection<GridCacheVersion>> versions(GridCacheVersion min) {
+ Collection<GridCacheVersion> committed = null;
+ Collection<GridCacheVersion> rolledback = null;
- for (Map.Entry<GridCacheVersion, Boolean> e : map.entrySet()) {
- if (e.getValue() == expVal)
- l.add(e.getKey());
- }
+ for (Map.Entry<GridCacheVersion, Boolean> e : completedVersSorted.tailMap(min, true).entrySet()) {
+ if (e.getValue()) {
+ if (committed == null)
+ committed = new ArrayList<>();
- return l;
- }
+ committed.add(e.getKey());
+ }
+ else {
+ if (rolledback == null)
+ rolledback = new ArrayList<>();
- /**
- * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
- */
- public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
+ rolledback.add(e.getKey());
+ }
+ }
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true);
+ return F.pair(
+ committed == null ? Collections.<GridCacheVersion>emptyList() : committed,
+ rolledback == null ? Collections.<GridCacheVersion>emptyList() : rolledback);
}
/**
- * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
+ * @return Collection of active transactions.
*/
- public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
-
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false);
+ public Collection<IgniteInternalTx> activeTransactions() {
+ return F.concat(false, idMap.values(), nearIdMap.values());
}
/**
* @param tx Tx to remove.
*/
public void removeCommittedTx(IgniteInternalTx tx) {
- completedVers.remove(tx.xidVersion(), true);
+ completedVersHashMap.remove(tx.xidVersion(), true);
+
+ if (tx.needsCompletedVersions())
+ completedVersSorted.remove(tx.xidVersion(), true);
}
/**
* @param tx Committed transaction.
- * @return If transaction was not already present in committed set.
*/
- public boolean addCommittedTx(IgniteInternalTx tx) {
- boolean res = addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+ public void addCommittedTx(IgniteInternalTx tx) {
+ addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
if (!tx.local() && !tx.near() && tx.onePhaseCommit())
- addCommittedTx(tx.nearXidVersion(), null);
-
- return res;
+ addCommittedTx(tx, tx.nearXidVersion(), null);
}
/**
@@ -836,60 +843,52 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return If transaction was not already present in committed set.
*/
public boolean addRolledbackTx(IgniteInternalTx tx) {
- return addRolledbackTx(tx.xidVersion());
- }
-
- /**
- * @return Collection of active transactions.
- */
- public Collection<IgniteInternalTx> activeTransactions() {
- return F.concat(false, idMap.values(), nearIdMap.values());
+ return addRolledbackTx(tx, tx.xidVersion());
}
/**
+ * @param tx Tx.
* @param xidVer Completed transaction version.
* @param nearXidVer Optional near transaction ID.
* @return If transaction was not already present in completed set.
*/
- public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) {
+ public boolean addCommittedTx(
+ IgniteInternalTx tx,
+ GridCacheVersion xidVer,
+ @Nullable GridCacheVersion nearXidVer
+ ) {
if (nearXidVer != null)
xidVer = new CommittedVersion(xidVer, nearXidVer);
- Boolean committed = completedVers.putIfAbsent(xidVer, true);
+ Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
- if (committed == null || committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to committed version set: " + xidVer);
+ if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
+ Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
- return true;
+ assert b == null;
}
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in rolled back version set: " + xidVer);
- return false;
- }
+ return committed0 == null || committed0;
}
/**
+ * @param tx Tx.
* @param xidVer Completed transaction version.
* @return If transaction was not already present in completed set.
*/
- public boolean addRolledbackTx(GridCacheVersion xidVer) {
- Boolean committed = completedVers.putIfAbsent(xidVer, false);
+ public boolean addRolledbackTx(
+ IgniteInternalTx tx,
+ GridCacheVersion xidVer
+ ) {
+ Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
- if (committed == null || !committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to rolled back version set: " + xidVer);
+ if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
+ Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
- return true;
+ assert b == null;
}
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in committed version set: " + xidVer);
- return false;
- }
+ return committed0 == null || !committed0;
}
/**
@@ -903,7 +902,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert min != null;
- tx.completedVersions(min, committedVersions(min), rolledbackVersions(min));
+ IgnitePair<Collection<GridCacheVersion>> versPair = versions(min);
+
+ tx.completedVersions(min, versPair.get1(), versPair.get2());
}
}
@@ -1027,18 +1028,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* so we don't do it here.
*/
- Boolean committed = completedVers.get(tx.xidVersion());
+ Boolean committed = completedVersHashMap.get(tx.xidVersion());
// 1. Make sure that committed version has been recorded.
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
- GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
- GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
-
throw new IgniteException("Missing commit version (consider increasing " +
- IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
- first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
+ IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
+ ", tx=" + tx.getClass().getSimpleName() + ']');
}
ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
@@ -1578,12 +1576,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return resFut;
}
- Boolean committed = null;
-
- for (Map.Entry<GridCacheVersion, Boolean> entry : completedVers.entrySet()) {
- if (entry.getValue() == null)
- continue;
+ boolean committed = false;
+ for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
if (entry.getKey() instanceof CommittedVersion) {
CommittedVersion comm = (CommittedVersion)entry.getKey();
@@ -1598,7 +1593,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Near transaction committed: " + committed);
- resFut.onDone(committed != null && committed);
+ resFut.onDone(committed);
return resFut;
}
@@ -1702,7 +1697,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Not all transactions were found. Need to scan committed versions to check
// if transaction was already committed.
- for (Map.Entry<GridCacheVersion, Boolean> e : completedVers.entrySet()) {
+ for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
if (!e.getValue())
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
index 04d1a85..7aa3734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
@@ -116,9 +116,6 @@ public class GridBoundedConcurrentLinkedHashMap<K, V> extends ConcurrentLinkedHa
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return policy() != SINGLE_Q ?
- S.toString(GridBoundedConcurrentLinkedHashMap.class, this) :
- S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet());
+ return S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
index a06f2ff..2801839 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
@@ -156,9 +156,6 @@ public class GridBoundedConcurrentLinkedHashSet<E> extends GridSetWrapper<E> {
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return ((ConcurrentLinkedHashMap<E, Object>)map()).policy() != SINGLE_Q ?
- S.toString(GridBoundedConcurrentLinkedHashSet.class, this) :
- S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet());
+ return S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
index b091652..3f6db30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.util;
import java.util.Comparator;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +45,7 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
private static final long serialVersionUID = 0L;
/** Element count. */
- private final AtomicInteger cnt = new AtomicInteger(0);
+ private final AtomicInteger cnt = new AtomicInteger();
/** Maximum size. */
private int max;
@@ -168,35 +167,21 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
private void onPut() {
cnt.incrementAndGet();
- int c;
+ IgniteBiInClosure<K, V> lsnr = this.lsnr;
- while ((c = cnt.get()) > max) {
- // Decrement count.
- if (cnt.compareAndSet(c, c - 1)) {
- try {
- K key = firstEntry().getKey();
+ int delta = cnt.get() - max;
- V val;
+ for (int i = 0; i < delta && cnt.get() > max; i++) {
+ Entry<K, V> e = pollFirstEntry();
- // Make sure that an element is removed.
- while ((val = super.remove(firstEntry().getKey())) == null) {
- // No-op.
- }
+ if (e == null)
+ return;
- assert val != null;
-
- IgniteBiInClosure<K, V> lsnr = this.lsnr;
-
- // Listener notification.
- if (lsnr != null)
- lsnr.apply(key, val);
- }
- catch (NoSuchElementException ignored) {
- cnt.incrementAndGet();
+ cnt.decrementAndGet();
- return;
- }
- }
+ // Listener notification.
+ if (lsnr != null)
+ lsnr.apply(e.getKey(), e.getValue());
}
}
@@ -251,4 +236,4 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
return rmvd;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
index 6e0e876..d1a7bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
@@ -18,18 +18,12 @@
package org.apache.ignite.internal.util;
import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteSystemProperties;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAP_CONCURRENCY_LEVEL;
-
/**
* Concurrent map factory.
*/
public class GridConcurrentFactory {
- /** Default concurrency level. */
- private static final int CONCURRENCY_LEVEL = IgniteSystemProperties.getInteger(IGNITE_MAP_CONCURRENCY_LEVEL, 256);
-
/**
* Ensure singleton.
*/
@@ -43,7 +37,6 @@ public class GridConcurrentFactory {
* @return New concurrent map.
*/
public static <K, V> ConcurrentMap<K, V> newMap() {
- return new ConcurrentHashMap8<>(16 * CONCURRENCY_LEVEL, 0.75f, CONCURRENCY_LEVEL);
+ return new ConcurrentHashMap8<>();
}
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
index 5a53b4b..0c76787 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
@@ -24,8 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.SINGLE_Q;
-
/**
* Concurrent linked set implementation.
*/
@@ -123,9 +121,6 @@ public class GridConcurrentLinkedHashSet<E> extends GridSetWrapper<E> {
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return ((ConcurrentLinkedHashMap)map()).policy() != SINGLE_Q ?
- S.toString(GridConcurrentLinkedHashSet.class, this) :
- S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet());
+ return S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
index 4ca00d9..d9ffdd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.util;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
/**
*
*/
@@ -29,7 +31,7 @@ public final class IgniteUuidCache {
/** Cache. */
private static final ConcurrentMap<UUID, UUID> cache =
- new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64);
+ new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q);
/**
* Gets cached UUID to preserve memory.
@@ -56,4 +58,4 @@ public final class IgniteUuidCache {
private IgniteUuidCache() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 0a6d9aa..31674f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.util.future;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -35,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
/**
* Future composed of multiple inner futures.
@@ -44,33 +41,38 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** */
private static final long serialVersionUID = 0L;
- /** Futures. */
- private final ConcurrentLinkedDeque8<IgniteInternalFuture<T>> futs = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final int INITED = 0b1;
- /** Pending futures. */
- private final Collection<IgniteInternalFuture<T>> pending = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
- /** Listener call count. */
- private final AtomicInteger lsnrCalls = new AtomicInteger();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
- /** Finished flag. */
- private final AtomicBoolean finished = new AtomicBoolean();
+ /** Futures. */
+ private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
/** Reducer. */
@GridToStringInclude
private IgniteReducer<T, R> rdc;
- /** Initialize flag. */
- private AtomicBoolean init = new AtomicBoolean(false);
-
- /** Result with a flag to control if reducer has been called. */
- private AtomicMarkableReference<R> res = new AtomicMarkableReference<>(null, false);
-
/** Exceptions to ignore. */
private Class<? extends Throwable>[] ignoreChildFailures;
- /** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>();
+ /**
+ * Updated via {@link #flagsUpd}.
+ *
+ * @see #INITED
+ */
+ @SuppressWarnings("unused")
+ private volatile int flags;
+
+ /** Updated via {@link #lsnrCallsUpd}. */
+ @SuppressWarnings("unused")
+ private volatile int lsnrCalls;
/**
*
@@ -104,7 +106,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
if (onCancelled()) {
- for (IgniteInternalFuture<T> fut : futs)
+ for (IgniteInternalFuture<T> fut : futures())
fut.cancel();
return true;
@@ -118,8 +120,26 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @return Collection of futures.
*/
+ private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+ synchronized (futs) {
+ Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
+
+ for (IgniteInternalFuture<T> fut : futs) {
+ if (!pending || !fut.isDone())
+ res.add(fut);
+ }
+
+ return res;
+ }
+ }
+
+ /**
+ * Gets collection of futures.
+ *
+ * @return Collection of futures.
+ */
public Collection<IgniteInternalFuture<T>> futures() {
- return futs;
+ return futures(false);
}
/**
@@ -128,7 +148,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return Pending futures.
*/
public Collection<IgniteInternalFuture<T>> pending() {
- return pending;
+ return futures(true);
}
/**
@@ -147,7 +167,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return {@code True} if there are pending futures.
*/
public boolean hasPending() {
- return !pending.isEmpty();
+ return !pending().isEmpty();
}
/**
@@ -155,7 +175,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* {@link #markInitialized()} method is called on future.
*/
public boolean initialized() {
- return init.get();
+ return flagSet(INITED);
}
/**
@@ -166,18 +186,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
public void add(IgniteInternalFuture<T> fut) {
assert fut != null;
- pending.add(fut);
- futs.add(fut);
+ synchronized (futs) {
+ futs.add(fut);
+ }
fut.listen(new Listener());
- if (isCancelled())
+ if (isCancelled()) {
try {
fut.cancel();
}
catch (IgniteCheckedException e) {
onDone(e);
}
+ }
}
/**
@@ -185,7 +207,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @param futs Futures to add.
*/
- public void addAll(@Nullable IgniteInternalFuture<T>... futs) {
+ @SafeVarargs
+ public final void addAll(@Nullable IgniteInternalFuture<T>... futs) {
addAll(F.asList(futs));
}
@@ -195,9 +218,10 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @param futs Futures to add.
*/
public void addAll(@Nullable Iterable<IgniteInternalFuture<T>> futs) {
- if (futs != null)
+ if (futs != null) {
for (IgniteInternalFuture<T> fut : futs)
add(fut);
+ }
}
/**
@@ -219,10 +243,34 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
/**
+ * @param flag Flag to CAS.
+ * @return {@code True} if CAS succeeds.
+ */
+ private boolean casFlag(int flag) {
+ for (;;) {
+ int flags0 = flags;
+
+ if ((flags0 & flag) != 0)
+ return false;
+
+ if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
+ return true;
+ }
+ }
+
+ /**
+ * @param flag Flag to check.
+ * @return {@code True} if set.
+ */
+ private boolean flagSet(int flag) {
+ return (flags & flag) != 0;
+ }
+
+ /**
* Mark this future as initialized.
*/
public void markInitialized() {
- if (init.compareAndSet(false, true))
+ if (casFlag(INITED))
// Check complete to make sure that we take care
// of all the ignored callbacks.
checkComplete();
@@ -232,22 +280,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* Check completeness of the future.
*/
private void checkComplete() {
- Throwable err = this.err.get();
-
- boolean ignore = ignoreFailure(err);
-
- if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore))
- && finished.compareAndSet(false, true)) {
+ if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) {
try {
- if (err == null && rdc != null && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ onDone(rdc != null ? rdc.reduce() : null);
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
-
- return;
}
catch (AssertionError e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -256,8 +296,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
throw e;
}
+ }
+ }
- onDone(res.getReference(), ignore ? null : err);
+ /**
+ * @return Futures size.
+ */
+ private int futuresSize() {
+ synchronized (futs) {
+ return futs.size();
}
}
@@ -288,7 +335,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
"cancelled", isCancelled(),
"err", error(),
"futs",
- F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
+ F.viewReadOnly(futures(), new C1<IgniteInternalFuture<T>, String>() {
@Override public String apply(IgniteInternalFuture<T> f) {
return Boolean.toString(f.isDone());
}
@@ -305,14 +352,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public void apply(IgniteInternalFuture<T> fut) {
- pending.remove(fut);
-
try {
T t = fut.get();
try {
- if (rdc != null && !rdc.collect(t) && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ if (rdc != null && !rdc.collect(t))
+ onDone(rdc.reduce());
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -331,18 +376,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
ClusterTopologyCheckedException e) {
- err.compareAndSet(null, e);
+ if (!ignoreFailure(e))
+ onDone(e);
}
catch (IgniteCheckedException e) {
- if (!ignoreFailure(e))
+ if (!ignoreFailure(e)) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ onDone(e);
+ }
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ onDone(e);
}
catch (AssertionError e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -353,7 +400,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
throw e;
}
- lsnrCalls.incrementAndGet();
+ lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
checkComplete();
}
@@ -363,4 +410,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
return "Compound future listener: " + GridCompoundFuture.this;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
index d93f12e..b3747d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
@@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8<K, V>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
index 75db13c..28e38d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
@@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8<E>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
index 5b7381e..e8f8e0f 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
@@ -17,7 +17,6 @@ import java.util.AbstractSet;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.ConcurrentModificationException;
-import java.util.Deque;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
@@ -28,6 +27,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -264,12 +266,14 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
private volatile V val;
/** Reference to a node in queue for fast removal operations. */
+ @GridToStringExclude
private volatile ConcurrentLinkedDeque8.Node node;
/** Modification count of the map for duplicates exclusion. */
private volatile int modCnt;
/** Link to the next entry in a bucket */
+ @GridToStringExclude
private final HashEntry<K, V> next;
/**
@@ -332,6 +336,11 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
static <K, V> HashEntry<K, V>[] newArray(int i) {
return new HashEntry[i];
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HashEntry.class, this, "key", key, "val", val);
+ }
}
/**
@@ -749,7 +758,7 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
recordInsert(e, (ConcurrentLinkedDeque8)segEntryQ);
if (maxCap > 0)
- checkRemoveEldestEntrySegment();
+ checkRemoveEldestEntrySegment(c);
break;
@@ -757,7 +766,7 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
segEntryQ.add(e);
if (maxCap > 0)
- checkRemoveEldestEntrySegment();
+ checkRemoveEldestEntrySegment(c);
break;
@@ -779,23 +788,21 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
}
/**
- *
+ * @param cnt Segment entries count.
*/
- private void checkRemoveEldestEntrySegment() {
+ private void checkRemoveEldestEntrySegment(int cnt) {
assert maxCap > 0;
- int rmvCnt = sizex() - maxCap;
-
- for (int i = 0; i < rmvCnt; i++) {
+ if (cnt - ((maxCap / segments.length) + 1) > 0) {
HashEntry<K, V> e0 = segEntryQ.poll();
- if (e0 == null)
- break;
-
- removeLocked(e0.key, e0.hash, null /*no need to compare*/, false);
+ assert e0 != null;
- if (sizex() <= maxCap)
- break;
+ removeLocked(
+ e0.key,
+ e0.hash,
+ null /*no need to compare*/,
+ false);
}
}
@@ -1812,34 +1819,22 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
* @param asc {@code True} for ascending iterator.
*/
HashIterator(boolean asc) {
- // TODO GG-4788 - Need to fix iterators for ConcurrentLinkedHashMap in perSegment mode
- if (qPlc != SINGLE_Q)
- throw new IllegalStateException("Iterators are not supported in 'perSegmentQueue' modes.");
-
modCnt = ConcurrentLinkedHashMap.this.modCnt.intValue();
// Init delegate.
- delegate = asc ? entryQ.iterator() : entryQ.descendingIterator();
-
- advance();
- }
+ switch (qPlc) {
+ case SINGLE_Q:
+ delegate = asc ? entryQ.iterator() : entryQ.descendingIterator();
- /**
- * @return Copy of the queue.
- */
- private Deque<HashEntry<K, V>> copyQueue() {
- int i = entryQ.sizex();
-
- Deque<HashEntry<K, V>> res = new ArrayDeque<>(i);
-
- Iterator<HashEntry<K, V>> iter = entryQ.iterator();
+ break;
- while (iter.hasNext() && i-- >= 0)
- res.add(iter.next());
+ default:
+ assert qPlc == PER_SEGMENT_Q || qPlc == PER_SEGMENT_Q_OPTIMIZED_RMV : qPlc;
- assert !iter.hasNext() : "Entries queue has been modified.";
+ delegate = new HashIteratorDelegate();
+ }
- return res;
+ advance();
}
/**
@@ -1901,6 +1896,130 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
}
/**
+ *
+ */
+ private class HashIteratorDelegate implements Iterator<HashEntry<K, V>> {
+ /** */
+ private HashEntry<K, V>[] curTbl;
+
+ /** */
+ private int nextSegIdx;
+
+ /** */
+ private int nextTblIdx;
+
+ /** */
+ private HashEntry<K, V> next;
+
+ /** */
+ private HashEntry<K, V> next0;
+
+ /** */
+ private HashEntry<K, V> cur;
+
+ /**
+ *
+ */
+ public HashIteratorDelegate() {
+ nextSegIdx = segments.length - 1;
+ nextTblIdx = -1;
+
+ advance();
+ }
+
+ /**
+ *
+ */
+ private void advance() {
+ if (next0 != null && advanceInBucket(next0, true))
+ return;
+
+ while (nextTblIdx >= 0) {
+ HashEntry<K, V> bucket = curTbl[nextTblIdx--];
+
+ if (bucket != null && advanceInBucket(bucket, false))
+ return;
+ }
+
+ while (nextSegIdx >= 0) {
+ int nextSegIdx0 = nextSegIdx--;
+
+ Segment seg = segments[nextSegIdx0];
+
+ curTbl = seg.tbl;
+
+ for (int j = curTbl.length - 1; j >= 0; --j) {
+ HashEntry<K, V> bucket = curTbl[j];
+
+ if (bucket != null && advanceInBucket(bucket, false)) {
+ nextTblIdx = j - 1;
+
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param e Current next.
+ * @return {@code True} if advance succeeded.
+ */
+ @SuppressWarnings( {"unchecked"})
+ private boolean advanceInBucket(@Nullable HashEntry<K, V> e, boolean skipFirst) {
+ if (e == null)
+ return false;
+
+ next0 = e;
+
+ do {
+ if (!skipFirst) {
+ next = next0;
+
+ return true;
+ }
+ else
+ skipFirst = false;
+ }
+ while ((next0 = next0.next) != null);
+
+ assert next0 == null;
+
+ next = null;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HashEntry<K, V> next() {
+ HashEntry<K, V> e = next;
+
+ if (e == null)
+ throw new NoSuchElementException();
+
+ advance();
+
+ return e;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ if (cur == null)
+ throw new IllegalStateException();
+
+ HashEntry<K, V> e = cur;
+
+ cur = null;
+
+ ConcurrentLinkedHashMap.this.remove(e.key, e.val);
+ }
+ }
+
+ /**
* Key iterator implementation.
*/
private final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> {
@@ -2154,13 +2273,17 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
* the fastest "natural" evicts for bounded maps.
* <p>
* NOTE: Remove operations on map are slower than with other policies.
+ * <p>
+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map.
*/
PER_SEGMENT_Q,
/**
* Instance of {@code GridConcurrentLinkedDequeue} is created for each segment. This gives
* faster "natural" evicts for bounded queues and better remove operation times.
+ * <p>
+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map.
*/
PER_SEGMENT_Q_OPTIMIZED_RMV
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
index 50ba241..616fd43 100644
--- a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
@@ -26,12 +26,18 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
* Tests affinity function with different number of backups.
*/
public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest {
+ /** */
+ private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
/** Number of backups. */
private int backups;
@@ -45,6 +51,8 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(CacheMode.PARTITIONED);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 3a530f2..1d79e20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4696,7 +4696,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
TransactionIsolation txIsolation)
throws Exception
{
- log.info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
+ info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
cache.removeAll(data.keySet());
checkEmpty(cache, cacheSkipStore);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
index 6a0b9ad..19e49f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -32,13 +33,14 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTest {
/** */
- private volatile Integer failedKey;
+ private volatile boolean putFailed;
/** */
private String maxCompletedTxCount;
@@ -67,6 +69,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
@@ -90,43 +93,48 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
final AtomicInteger keyStart = new AtomicInteger();
+ final ConcurrentLinkedDeque<Integer> q = new ConcurrentLinkedDeque<>();
+
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
int start = keyStart.getAndAdd(KEYS_PER_THREAD);
- for (int i = 0; i < KEYS_PER_THREAD && failedKey == null; i++) {
+ for (int i = 0; i < KEYS_PER_THREAD && !putFailed; i++) {
int key = start + i;
try {
cache.put(key, 1);
}
catch (Exception e) {
- log.info("Put failed: " + e);
+ log.info("Put failed [err=" + e + ", i=" + i + ']');
+
+ putFailed = true;
- failedKey = key;
+ q.add(key);
}
}
-
return null;
}
}, 10, "put-thread");
- assertNotNull("Test failed to provoke 'missing commit version' error.", failedKey);
+ assertTrue("Test failed to provoke 'missing commit version' error.", putFailed);
- log.info("Trying to update " + failedKey);
+ for (Integer key : q) {
+ log.info("Trying to update " + key);
- IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+ IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
- asyncCache.put(failedKey, 2);
+ asyncCache.put(key, 2);
- IgniteFuture<?> fut = asyncCache.future();
+ IgniteFuture<?> fut = asyncCache.future();
- try {
- fut.get(5000);
- }
- catch (IgniteFutureTimeoutException ignore) {
- fail("Put failed to finish in 5s.");
+ try {
+ fut.get(5000);
+ }
+ catch (IgniteFutureTimeoutException ignore) {
+ fail("Put failed to finish in 5s: " + key);
+ }
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index abb2767..b93acf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -541,7 +541,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -894,4 +893,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Override public void onUnlock() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index a2440e2..ad51600 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -513,4 +513,4 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
assertTrue(((IgniteKernal)ignite).internalCache().context().isNear());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index a2308c6..8f5e07b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -1090,4 +1090,4 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
index e6dc7e6..8ce7ae3 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
@@ -52,4 +52,4 @@ public class GridBoundedConcurrentLinkedHashMapSelfTest extends GridCommonAbstra
assert it.next() == 9;
assert it.next() == 10;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
index a09ba15..7bcbd07 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
@@ -19,13 +19,18 @@ package org.apache.ignite.lang.utils;
import java.util.Date;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jsr166.ConcurrentLinkedHashMap;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
+
/**
* This class tests basic contracts of {@code ConcurrentLinkedHashMap}.
*/
@@ -264,4 +269,59 @@ public class GridConcurrentLinkedHashMapSelfTest extends GridCommonAbstractTest
assert nextVal == -1 : "Unexpected value: " + nextVal;
}
-}
\ No newline at end of file
+
+ /**
+ *
+ */
+ public void testIterationInPerSegmentModes() {
+ checkIteration(PER_SEGMENT_Q);
+ checkIteration(PER_SEGMENT_Q_OPTIMIZED_RMV);
+ }
+
+ /**
+ * @param plc Policy.
+ */
+ private void checkIteration(ConcurrentLinkedHashMap.QueuePolicy plc) {
+ ConcurrentLinkedHashMap<Integer, Integer> map =
+ new ConcurrentLinkedHashMap<>(10,
+ 0.75f,
+ 16,
+ 0,
+ plc);
+
+ Map<Integer, Integer> map0 = new HashMap<>();
+
+ int cnt = 0;
+
+ for (int i = 0; i < 100_000; i++) {
+ int key = ThreadLocalRandom.current().nextInt(15000);
+ int val = ThreadLocalRandom.current().nextInt(15000);
+
+ Integer rmv0 = map0.put(key, val);
+
+ if (rmv0 == null)
+ cnt++;
+
+ Integer rmv = map.put(key, val);
+
+ assertEquals(rmv0, rmv);
+ }
+
+ int checkCnt = 0;
+
+ for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+ checkCnt++;
+
+ Integer rmv = map0.remove(e.getKey());
+
+ assertNotNull(rmv);
+ assertEquals(rmv, e.getValue());
+ }
+
+ assertEquals(cnt, checkCnt);
+
+ info("Puts count: " + cnt);
+
+ assert map0.isEmpty() : map0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 71f3ee3..c19e718 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -95,7 +95,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* Super class for all common tests.
*/
public abstract class GridCommonAbstractTest extends GridAbstractTest {
- /**Cache peek modes array that consist of only ONHEAP mode. */
+ /** Cache peek modes array that consist of only ONHEAP mode. */
protected static final CachePeekMode[] ONHEAP_PEEK_MODES = new CachePeekMode[] {CachePeekMode.ONHEAP};
/**
@@ -1087,4 +1087,4 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
}
}
-}
\ No newline at end of file
+}