You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/10/02 14:09:11 UTC

ignite git commit: ignite-801: fixed issue in atomic reference and countdown latch in case of topology changes

Repository: ignite
Updated Branches:
  refs/heads/ignite-801 ead78a4fa -> 1d7d7786e


ignite-801: fixed issue in atomic reference and countdown latch in case of topology changes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d7d7786
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d7d7786
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d7d7786

Branch: refs/heads/ignite-801
Commit: 1d7d7786ef40d8d416222af054203f3ab328a712
Parents: ead78a4
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Oct 2 15:09:03 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 2 15:09:03 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        | 19 +++++++--------
 .../GridCacheAtomicReferenceImpl.java           | 10 ++++----
 .../GridCacheCountDownLatchImpl.java            | 16 ++-----------
 .../GridTransactionalCacheQueueImpl.java        |  2 +-
 ...eAbstractDataStructuresFailoverSelfTest.java | 25 +++++++++++++++++---
 5 files changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 7854c93..9bf8938 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1774,19 +1774,16 @@ public class GridCacheUtils {
                         return c.call();
                     }
                     catch (IgniteCheckedException e) {
-                        if (X.hasCause(e, ClusterTopologyCheckedException.class) ||
-                            X.hasCause(e, IgniteTxRollbackCheckedException.class) ||
-                            X.hasCause(e, CachePartialUpdateCheckedException.class)) {
-                            if (i < retries - 1) {
-                                err = e;
-
-                                U.sleep(1);
+                        if (i == retries)
+                            throw e;
 
-                                continue;
-                            }
+                        if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                            ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
 
-                            throw e;
+                            topErr.retryReadyFuture().get();
                         }
+                        else if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
+                            U.sleep(1);
                         else
                             throw e;
                     }
@@ -1797,4 +1794,4 @@ public class GridCacheUtils {
             }
         };
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index b25e111..2d8f7b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 
 /**
  * Cache atomic reference implementation.
@@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
      * @return Callable for execution in async and sync mode.
      */
     private Callable<Boolean> internalSet(final T val) {
-        return new Callable<Boolean>() {
+        return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
                 try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
                     throw e;
                 }
             }
-        };
+        });
     }
 
     /**
@@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
      */
     private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
         final IgniteClosure<T, T> newValClos) {
-        return new Callable<Boolean>() {
+
+        return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
                 try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
                     throw e;
                 }
             }
-        };
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 56be431..c984ab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -342,21 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     private class GetCountCallable implements Callable<Integer> {
         /** {@inheritDoc} */
         @Override public Integer call() throws Exception {
-            Integer val;
+            GridCacheCountDownLatchValue latchVal = latchView.get(key);
 
-            //REMOVE TR
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
-                if (latchVal == null)
-                    return 0;
-
-                val = latchVal.get();
-
-                tx.rollback();
-            }
-
-            return val;
+            return latchVal == null ? 0 : latchVal.get();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index c7750a6..b14bb5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -244,4 +244,4 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
             throw U.convertException(e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 0b12d63..eb07bbc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -451,6 +451,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception {
+        int queueMaxSize = 100;
+
         try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
             s.put(1);
 
@@ -464,15 +466,32 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
             int val = s.peek();
 
-            int origVal = val;
+            while (!fut.isDone()) {
+                if (s.size() == queueMaxSize) {
+                    int last = 0;
+
+                    for (int i = 0, size = s.size() - 1; i < size; i++) {
+                        int cur = s.poll();
+
+                        if (i == 0) {
+                            last = cur;
+
+                            continue;
+                        }
+
+                        assertEquals(last, cur - 1);
+
+                        last = cur;
+                    }
+                }
 
-            while (!fut.isDone())
                 s.put(++val);
+            }
 
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assertEquals(origVal, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
+                assertEquals(val, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
         }
     }