You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/12 07:03:03 UTC

ignite git commit: IGNITE-801: using retryTopologySafe for all data structures and atomics operations

Repository: ignite
Updated Branches:
  refs/heads/ignite-801 5061f30d9 -> 2e2643657


IGNITE-801: using retryTopologySafe for all data structures and atomics operations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e264365
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e264365
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e264365

Branch: refs/heads/ignite-801
Commit: 2e264365782e9242948b58184b77afc6f896fc9f
Parents: 5061f30
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Nov 11 19:14:33 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 11 19:14:33 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        |  20 ++-
 .../datastructures/DataStructuresProcessor.java |  24 +--
 .../GridAtomicCacheQueueImpl.java               |  13 +-
 .../datastructures/GridCacheQueueAdapter.java   |   3 -
 .../GridTransactionalCacheQueueImpl.java        | 179 +++++++------------
 5 files changed, 89 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index eba6e8d..a34cdba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -1780,16 +1782,23 @@ public class GridCacheUtils {
     public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
         return new Callable<S>() {
             @Override public S call() throws Exception {
-                int retries = GridCacheAdapter.MAX_RETRIES;
-
                 IgniteCheckedException err = null;
 
-                for (int i = 0; i < retries; i++) {
+                for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
                     try {
                         return c.call();
                     }
+                    catch (ClusterGroupEmptyCheckedException e) {
+                        throw e;
+                    }
+                    catch (TransactionRollbackException e) {
+                        if (i == GridCacheAdapter.MAX_RETRIES)
+                            throw e;
+
+                        U.sleep(1);
+                    }
                     catch (IgniteCheckedException e) {
-                        if (i == retries)
+                        if (i == GridCacheAdapter.MAX_RETRIES)
                             throw e;
 
                         if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
@@ -1797,7 +1806,8 @@ public class GridCacheUtils {
 
                             topErr.retryReadyFuture().get();
                         }
-                        else if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
+                        else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+                            CachePartialUpdateCheckedException.class))
                             U.sleep(1);
                         else
                             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 3c14ecf..3559776 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -53,17 +53,15 @@ import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheType;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -1426,25 +1424,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      */
     public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
         try {
-            int cnt = 0;
-
-            while (true) {
-                try {
-                    return call.call();
-                }
-                catch (ClusterGroupEmptyCheckedException e) {
-                    throw new IgniteCheckedException(e);
-                }
-                catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            return GridCacheUtils.retryTopologySafe(call).call();
         }
         catch (IgniteCheckedException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 28f8631..78aa9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -24,6 +24,7 @@ import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -67,7 +68,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                     break;
                 }
                 catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
+                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
                         throw e;
                     else {
                         U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
@@ -122,7 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                         break;
                     }
                     catch (CachePartialUpdateCheckedException e) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
+                        if (cnt++ == GridCacheAdapter.MAX_RETRIES)
                             throw e;
                         else {
                             U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
@@ -170,7 +171,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                     break;
                 }
                 catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
+                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
                         throw e;
                     else {
                         U.warn(log, "Failed to add items, will retry [err=" + e + ']');
@@ -217,7 +218,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                     break;
                 }
                 catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
+                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
                         throw e;
                     else {
                         U.warn(log, "Failed to add items, will retry [err=" + e + ']');
@@ -246,7 +247,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                 return (Long)cache.invoke(queueKey, c).get();
             }
             catch (CachePartialUpdateCheckedException e) {
-                if (cnt++ == MAX_UPDATE_RETRIES)
+                if (cnt++ == GridCacheAdapter.MAX_RETRIES)
                     throw e;
                 else {
                     U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
@@ -256,4 +257,4 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index eb23ad7..df1bd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
 
     /** */
-    protected static final int MAX_UPDATE_RETRIES = 100;
-
-    /** */
     protected static final long RETRY_DELAY = 1;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index b14bb5a..0e7e032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
         A.notNull(item, "item");
 
         try {
-            boolean retVal;
+            return retryTopologySafe(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    boolean retVal;
 
-            int cnt = 0;
-
-            while (true) {
-                try {
                     try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                         Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
 
@@ -76,75 +72,53 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
 
                         tx.commit();
 
-                        break;
-                    }
-                }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
-
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
+                        return retVal;
                     }
                 }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public T poll() throws IgniteException {
         try {
-            int cnt = 0;
-
-            T retVal;
-
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+            return retryTopologySafe(new Callable<T>() {
+                @Override public T call() throws Exception {
+                    T retVal;
 
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        retVal = (T)cache.getAndRemove(itemKey(idx));
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
 
-                        assert retVal != null : idx;
-                    }
-                    else
-                        retVal = null;
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                    tx.commit();
+                            retVal = (T)cache.getAndRemove(itemKey(idx));
 
-                    break;
-                }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
+                            assert retVal != null : idx;
+                        }
+                        else
+                            retVal = null;
 
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
+                        tx.commit();
 
-                        U.sleep(RETRY_DELAY);
+                        return retVal;
                     }
                 }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -153,95 +127,72 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
         A.notNull(items, "items");
 
         try {
-            boolean retVal;
-
-            int cnt = 0;
+            return retryTopologySafe(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    boolean retVal;
 
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
-
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
 
-                        for (T item : items) {
-                            putMap.put(itemKey(idx), item);
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                            idx++;
-                        }
+                            Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
 
-                        cache.putAll(putMap);
+                            for (T item : items) {
+                                putMap.put(itemKey(idx), item);
 
-                        retVal = true;
-                    }
-                    else
-                        retVal = false;
+                                idx++;
+                            }
 
-                    tx.commit();
+                            cache.putAll(putMap);
 
-                    break;
-                }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
+                            retVal = true;
+                        }
+                        else
+                            retVal = false;
 
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+                        tx.commit();
 
-                        U.sleep(RETRY_DELAY);
+                        return retVal;
                     }
                 }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException {
         try {
-            int cnt = 0;
+            retryTopologySafe(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
 
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                    if (idx != null) {
-                        checkRemoved(idx);
+                            boolean rmv = cache.remove(itemKey(idx));
 
-                        boolean rmv = cache.remove(itemKey(idx));
+                            assert rmv : idx;
+                        }
 
-                        assert rmv : idx;
+                        tx.commit();
                     }
 
-                    tx.commit();
-
-                    break;
+                    return null;
                 }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
-
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            }).call();
         }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
         }
     }
 }