You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/04 10:02:51 UTC

[45/50] ignite git commit: IGNITE-5613 - Fixed race on local sequence increment and distributed update

IGNITE-5613 - Fixed race on local sequence increment and distributed update


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d42dea8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d42dea8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d42dea8

Branch: refs/heads/master
Commit: 7d42dea8e3e50707c34e8e8d211cd54da1505210
Parents: c4ddda3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jul 3 17:05:48 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 17:05:48 2017 +0300

----------------------------------------------------------------------
 .../GridCacheAtomicSequenceImpl.java            | 55 ++++++++++++--------
 ...titionedAtomicSequenceMultiThreadedTest.java | 32 ++++++++++++
 2 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d42dea8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 725e4aa..47fa49e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -382,39 +382,48 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
                     long newUpBound;
 
-                    curLocVal = locVal;
+                    // Even though we hold a transaction lock here, we must hold the local update lock here as well
+                    // because we mutate multipe variables (locVal and upBound).
+                    localUpdate.lock();
 
-                    // If local range was already reserved in another thread.
-                    if (curLocVal + l <= upBound) {
-                        locVal = curLocVal + l;
+                    try {
+                        curLocVal = locVal;
 
-                        return updated ? curLocVal + l : curLocVal;
-                    }
+                        // If local range was already reserved in another thread.
+                        if (curLocVal + l <= upBound) {
+                            locVal = curLocVal + l;
 
-                    long curGlobalVal = seq.get();
+                            return updated ? curLocVal + l : curLocVal;
+                        }
 
-                    long newLocVal;
+                        long curGlobalVal = seq.get();
 
-                    /* We should use offset because we already reserved left side of range.*/
-                    long off = batchSize > 1 ? batchSize - 1 : 1;
+                        long newLocVal;
 
-                    // Calculate new values for local counter, global counter and upper bound.
-                    if (curLocVal + l >= curGlobalVal) {
-                        newLocVal = curLocVal + l;
+                        /* We should use offset because we already reserved left side of range.*/
+                        long off = batchSize > 1 ? batchSize - 1 : 1;
 
-                        newUpBound = newLocVal + off;
-                    }
-                    else {
-                        newLocVal = curGlobalVal;
+                        // Calculate new values for local counter, global counter and upper bound.
+                        if (curLocVal + l >= curGlobalVal) {
+                            newLocVal = curLocVal + l;
 
-                        newUpBound = newLocVal + off;
-                    }
+                            newUpBound = newLocVal + off;
+                        }
+                        else {
+                            newLocVal = curGlobalVal;
 
-                    locVal = newLocVal;
-                    upBound = newUpBound;
+                            newUpBound = newLocVal + off;
+                        }
 
-                    if (updated)
-                        curLocVal = newLocVal;
+                        locVal = newLocVal;
+                        upBound = newUpBound;
+
+                        if (updated)
+                            curLocVal = newLocVal;
+                    }
+                    finally {
+                        localUpdate.unlock();
+                    }
 
                     // Global counter must be more than reserved upper bound.
                     seq.set(newUpBound + 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d42dea8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
index 945650d..4db9bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
 
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
@@ -26,6 +27,7 @@ import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest;
 import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
@@ -281,6 +283,36 @@ public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteA
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    public void testMultipleSequences() throws Exception {
+        final int seqCnt = 5;
+        final int threadCnt = 5;
+        final int incCnt = 1_000;
+
+        final IgniteAtomicSequence[] seqs = new IgniteAtomicSequence[seqCnt];
+
+        String seqName = UUID.randomUUID().toString();
+
+        for (int i = 0; i < seqs.length; i++)
+            seqs[i] = grid(0).atomicSequence(seqName, 0, true);
+
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < incCnt; i++) {
+                    for (IgniteAtomicSequence seq : seqs)
+                        seq.incrementAndGet();
+                }
+
+                return null;
+            }
+        }, threadCnt, "load");
+
+        for (IgniteAtomicSequence seq : seqs)
+            assertEquals(seqCnt * threadCnt * incCnt, seq.get());
+    }
+
+    /**
      * Executes given closure in a given number of threads given number of times.
      *
      * @param c Closure to execute.