You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/04 10:02:51 UTC
[45/50] ignite git commit: IGNITE-5613 - Fixed race on local sequence
increment and distributed update
IGNITE-5613 - Fixed race on local sequence increment and distributed update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d42dea8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d42dea8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d42dea8
Branch: refs/heads/master
Commit: 7d42dea8e3e50707c34e8e8d211cd54da1505210
Parents: c4ddda3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jul 3 17:05:48 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 17:05:48 2017 +0300
----------------------------------------------------------------------
.../GridCacheAtomicSequenceImpl.java | 55 ++++++++++++--------
...titionedAtomicSequenceMultiThreadedTest.java | 32 ++++++++++++
2 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d42dea8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 725e4aa..47fa49e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -382,39 +382,48 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
long newUpBound;
- curLocVal = locVal;
+ // Even though we hold a transaction lock here, we must hold the local update lock here as well
+ // because we mutate multipe variables (locVal and upBound).
+ localUpdate.lock();
- // If local range was already reserved in another thread.
- if (curLocVal + l <= upBound) {
- locVal = curLocVal + l;
+ try {
+ curLocVal = locVal;
- return updated ? curLocVal + l : curLocVal;
- }
+ // If local range was already reserved in another thread.
+ if (curLocVal + l <= upBound) {
+ locVal = curLocVal + l;
- long curGlobalVal = seq.get();
+ return updated ? curLocVal + l : curLocVal;
+ }
- long newLocVal;
+ long curGlobalVal = seq.get();
- /* We should use offset because we already reserved left side of range.*/
- long off = batchSize > 1 ? batchSize - 1 : 1;
+ long newLocVal;
- // Calculate new values for local counter, global counter and upper bound.
- if (curLocVal + l >= curGlobalVal) {
- newLocVal = curLocVal + l;
+ /* We should use offset because we already reserved left side of range.*/
+ long off = batchSize > 1 ? batchSize - 1 : 1;
- newUpBound = newLocVal + off;
- }
- else {
- newLocVal = curGlobalVal;
+ // Calculate new values for local counter, global counter and upper bound.
+ if (curLocVal + l >= curGlobalVal) {
+ newLocVal = curLocVal + l;
- newUpBound = newLocVal + off;
- }
+ newUpBound = newLocVal + off;
+ }
+ else {
+ newLocVal = curGlobalVal;
- locVal = newLocVal;
- upBound = newUpBound;
+ newUpBound = newLocVal + off;
+ }
- if (updated)
- curLocVal = newLocVal;
+ locVal = newLocVal;
+ upBound = newUpBound;
+
+ if (updated)
+ curLocVal = newLocVal;
+ }
+ finally {
+ localUpdate.unlock();
+ }
// Global counter must be more than reserved upper bound.
seq.set(newUpBound + 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d42dea8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
index 945650d..4db9bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
@@ -26,6 +27,7 @@ import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -281,6 +283,36 @@ public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteA
}
/**
+ * @throws Exception if failed.
+ */
+ public void testMultipleSequences() throws Exception {
+ final int seqCnt = 5;
+ final int threadCnt = 5;
+ final int incCnt = 1_000;
+
+ final IgniteAtomicSequence[] seqs = new IgniteAtomicSequence[seqCnt];
+
+ String seqName = UUID.randomUUID().toString();
+
+ for (int i = 0; i < seqs.length; i++)
+ seqs[i] = grid(0).atomicSequence(seqName, 0, true);
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < incCnt; i++) {
+ for (IgniteAtomicSequence seq : seqs)
+ seq.incrementAndGet();
+ }
+
+ return null;
+ }
+ }, threadCnt, "load");
+
+ for (IgniteAtomicSequence seq : seqs)
+ assertEquals(seqCnt * threadCnt * incCnt, seq.get());
+ }
+
+ /**
* Executes given closure in a given number of threads given number of times.
*
* @param c Closure to execute.