You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/12/10 04:22:12 UTC

[05/13] ignite git commit: ignite-1.5 Fixed asserts in GridTransactionalCacheQueueImpl to take into account case when queue data was really lost.

ignite-1.5 Fixed asserts in GridTransactionalCacheQueueImpl to take into account case when queue data was really lost.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/add61614
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/add61614
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/add61614

Branch: refs/heads/ignite-843-rc2
Commit: add6161451d3e8b263782e52c51a0e2b34daeb3f
Parents: 3016c0f
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 9 16:32:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 9 16:32:48 2015 +0300

----------------------------------------------------------------------
 .../GridTransactionalCacheQueueImpl.java        |  32 +++--
 ...eAbstractDataStructuresFailoverSelfTest.java | 138 +++++++++++++++----
 2 files changed, 128 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 32e94d3..7b80765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -96,22 +96,28 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                 @Override public T call() throws Exception {
                     T retVal;
 
-                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                        Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+                    while (true) {
+                        try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                            Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
 
-                        if (idx != null) {
-                            checkRemoved(idx);
+                            if (idx != null) {
+                                checkRemoved(idx);
 
-                            retVal = (T)cache.getAndRemove(itemKey(idx));
+                                retVal = (T)cache.getAndRemove(itemKey(idx));
 
-                            assert retVal != null : idx;
-                        }
-                        else
-                            retVal = null;
+                                if (retVal == null) { // Possible if data was lost.
+                                    tx.commit();
 
-                        tx.commit();
+                                    continue;
+                                }
+                            }
+                            else
+                                retVal = null;
 
-                        return retVal;
+                            tx.commit();
+
+                            return retVal;
+                        }
                     }
                 }
             }).call();
@@ -188,9 +194,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                         if (idx != null) {
                             checkRemoved(idx);
 
-                            boolean rmv = cache.remove(itemKey(idx));
-
-                            assert rmv : idx;
+                            cache.remove(itemKey(idx));
                         }
 
                         tx.commit();

http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index cb16aeb..1e15c15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -196,14 +196,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicLongConstantTopologyChange() throws Exception {
-        doTestAtomicLong(new ConstantTopologyChangeWorker());
+        doTestAtomicLong(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicLong(multipleTopologyChangeWorker());
+        doTestAtomicLong(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
@@ -258,14 +258,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicReferenceConstantTopologyChange() throws Exception {
-        doTestAtomicReference(new ConstantTopologyChangeWorker());
+        doTestAtomicReference(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicReference(multipleTopologyChangeWorker());
+        doTestAtomicReference(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
@@ -326,14 +326,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicStampedConstantTopologyChange() throws Exception {
-        doTestAtomicStamped(new ConstantTopologyChangeWorker());
+        doTestAtomicStamped(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicStamped(multipleTopologyChangeWorker());
+        doTestAtomicStamped(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
@@ -687,14 +687,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testCountDownLatchConstantTopologyChange() throws Exception {
-        doTestCountDownLatch(new ConstantTopologyChangeWorker());
+        doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
-        doTestCountDownLatch(multipleTopologyChangeWorker());
+        doTestCountDownLatch(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
@@ -758,15 +758,73 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
+    public void testQueueTopologyChange() throws Exception {
+        ConstantTopologyChangeWorker topWorker = new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT);
+
+        try (final IgniteQueue<Integer> q = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
+            for (int i = 0; i < 1000; i++)
+                q.add(i);
+
+            final IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    return null;
+                }
+            });
+
+            IgniteInternalFuture<?> takeFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!fut.isDone())
+                        q.take();
+
+                    return null;
+                }
+            });
+
+            IgniteInternalFuture<?> pollFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!fut.isDone())
+                        q.poll();
+
+                    return null;
+                }
+            });
+
+            IgniteInternalFuture<?> addFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!fut.isDone())
+                        q.add(0);
+
+                    return null;
+                }
+            });
+
+            fut.get();
+
+            pollFut.get();
+            addFut.get();
+
+            q.add(0);
+
+            takeFut.get();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testQueueConstantTopologyChange() throws Exception {
-        doTestQueue(new ConstantTopologyChangeWorker());
+        int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT;
+
+        doTestQueue(new ConstantTopologyChangeWorker(topChangeThreads));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testQueueConstantMultipleTopologyChange() throws Exception {
-        doTestQueue(multipleTopologyChangeWorker());
+        int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT;
+
+        doTestQueue(multipleTopologyChangeWorker(topChangeThreads));
     }
 
     /**
@@ -902,14 +960,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicSequenceConstantTopologyChange() throws Exception {
-        doTestAtomicSequence(new ConstantTopologyChangeWorker());
+        doTestAtomicSequence(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
-        doTestAtomicSequence(multipleTopologyChangeWorker());
+        doTestAtomicSequence(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }
 
     /**
@@ -977,11 +1035,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     }
 
     /**
+     * @param topChangeThreads Number of topology change threads.
+     *
      * @return Specific multiple topology change worker implementation.
      */
-    private ConstantTopologyChangeWorker multipleTopologyChangeWorker() {
-        return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() :
-            new MultipleTopologyChangeWorker();
+    private ConstantTopologyChangeWorker multipleTopologyChangeWorker(int topChangeThreads) {
+        return collectionCacheMode() == CacheMode.PARTITIONED ?
+            new PartitionedMultipleTopologyChangeWorker(topChangeThreads) :
+            new MultipleTopologyChangeWorker(topChangeThreads);
     }
 
     /**
@@ -991,13 +1052,24 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         /** */
         protected final AtomicBoolean failed = new AtomicBoolean(false);
 
+        /** */
+        private final int topChangeThreads;
+
+        /**
+         * @param topChangeThreads Number of topology change threads.
+         */
+        public ConstantTopologyChangeWorker(int topChangeThreads) {
+            this.topChangeThreads = topChangeThreads;
+        }
+
         /**
          * Starts changing cluster's topology.
          *
+         * @param cb Callback to run after node start.
          * @return Future.
          */
-        IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+        IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
+            return GridTestUtils.runMultiThreadedAsync(new CA() {
                 @Override public void apply() {
                     try {
                         for (int i = 0; i < TOP_CHANGE_CNT; i++) {
@@ -1011,7 +1083,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
                                 Ignite g = startGrid(name);
 
-                                callback.apply(g);
+                                cb.apply(g);
                             }
                             finally {
                                 if (i != TOP_CHANGE_CNT - 1)
@@ -1024,9 +1096,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                             throw F.wrap(e);
                     }
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-            return fut;
+            }, topChangeThreads, "topology-change-thread");
         }
     }
 
@@ -1035,12 +1105,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      */
     private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
         /**
+         * @param topChangeThreads Number of topology change threads.
+         */
+        public MultipleTopologyChangeWorker(int topChangeThreads) {
+            super(topChangeThreads);
+        }
+
+        /**
          * Starts changing cluster's topology.
          *
          * @return Future.
          */
-        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
+            return GridTestUtils.runMultiThreadedAsync(new CA() {
                 @Override public void apply() {
                     try {
                         for (int i = 0; i < TOP_CHANGE_CNT; i++) {
@@ -1062,7 +1139,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
                                     names.add(name);
 
-                                    callback.apply(g);
+                                    cb.apply(g);
                                 }
                             }
                             finally {
@@ -1079,8 +1156,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                     }
                 }
             }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-            return fut;
         }
     }
 
@@ -1092,11 +1167,18 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         private CyclicBarrier barrier;
 
         /**
+         * @param topChangeThreads Number of topology change threads.
+         */
+        public PartitionedMultipleTopologyChangeWorker(int topChangeThreads) {
+            super(topChangeThreads);
+        }
+
+        /**
          * Starts changing cluster's topology.
          *
          * @return Future.
          */
-        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
             final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT);
 
             final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>();
@@ -1151,7 +1233,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
                                 Ignite g = startGrid(name);
 
-                                callback.apply(g);
+                                cb.apply(g);
                             }
 
                             try {