You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/12 07:03:03 UTC
ignite git commit: IGNITE-801: using retryTopologySafe for all data
structures and atomics operations
Repository: ignite
Updated Branches:
refs/heads/ignite-801 5061f30d9 -> 2e2643657
IGNITE-801: using retryTopologySafe for all data structures and atomics operations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e264365
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e264365
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e264365
Branch: refs/heads/ignite-801
Commit: 2e264365782e9242948b58184b77afc6f896fc9f
Parents: 5061f30
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Nov 11 19:14:33 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 11 19:14:33 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 20 ++-
.../datastructures/DataStructuresProcessor.java | 24 +--
.../GridAtomicCacheQueueImpl.java | 13 +-
.../datastructures/GridCacheQueueAdapter.java | 3 -
.../GridTransactionalCacheQueueImpl.java | 179 +++++++------------
5 files changed, 89 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index eba6e8d..a34cdba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -1780,16 +1782,23 @@ public class GridCacheUtils {
public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
return new Callable<S>() {
@Override public S call() throws Exception {
- int retries = GridCacheAdapter.MAX_RETRIES;
-
IgniteCheckedException err = null;
- for (int i = 0; i < retries; i++) {
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
try {
return c.call();
}
+ catch (ClusterGroupEmptyCheckedException e) {
+ throw e;
+ }
+ catch (TransactionRollbackException e) {
+ if (i == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ U.sleep(1);
+ }
catch (IgniteCheckedException e) {
- if (i == retries)
+ if (i == GridCacheAdapter.MAX_RETRIES)
throw e;
if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
@@ -1797,7 +1806,8 @@ public class GridCacheUtils {
topErr.retryReadyFuture().get();
}
- else if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
+ else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+ CachePartialUpdateCheckedException.class))
U.sleep(1);
else
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 3c14ecf..3559776 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -53,17 +53,15 @@ import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -1426,25 +1424,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
- int cnt = 0;
-
- while (true) {
- try {
- return call.call();
- }
- catch (ClusterGroupEmptyCheckedException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ return GridCacheUtils.retryTopologySafe(call).call();
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 28f8631..78aa9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -24,6 +24,7 @@ import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -67,7 +68,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
break;
}
catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
+ if (cnt++ == GridCacheAdapter.MAX_RETRIES)
throw e;
else {
U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
@@ -122,7 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
break;
}
catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
+ if (cnt++ == GridCacheAdapter.MAX_RETRIES)
throw e;
else {
U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
@@ -170,7 +171,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
break;
}
catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
+ if (cnt++ == GridCacheAdapter.MAX_RETRIES)
throw e;
else {
U.warn(log, "Failed to add items, will retry [err=" + e + ']');
@@ -217,7 +218,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
break;
}
catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
+ if (cnt++ == GridCacheAdapter.MAX_RETRIES)
throw e;
else {
U.warn(log, "Failed to add items, will retry [err=" + e + ']');
@@ -246,7 +247,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
return (Long)cache.invoke(queueKey, c).get();
}
catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
+ if (cnt++ == GridCacheAdapter.MAX_RETRIES)
throw e;
else {
U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
@@ -256,4 +257,4 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index eb23ad7..df1bd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
/** */
- protected static final int MAX_UPDATE_RETRIES = 100;
-
- /** */
protected static final long RETRY_DELAY = 1;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e264365/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index b14bb5a..0e7e032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(item, "item");
try {
- boolean retVal;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- int cnt = 0;
-
- while (true) {
- try {
try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
@@ -76,75 +72,53 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
tx.commit();
- break;
- }
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public T poll() throws IgniteException {
try {
- int cnt = 0;
-
- T retVal;
-
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+ return retryTopologySafe(new Callable<T>() {
+ @Override public T call() throws Exception {
+ T retVal;
- if (idx != null) {
- checkRemoved(idx);
-
- retVal = (T)cache.getAndRemove(itemKey(idx));
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
- assert retVal != null : idx;
- }
- else
- retVal = null;
+ if (idx != null) {
+ checkRemoved(idx);
- tx.commit();
+ retVal = (T)cache.getAndRemove(itemKey(idx));
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ assert retVal != null : idx;
+ }
+ else
+ retVal = null;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@@ -153,95 +127,72 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(items, "items");
try {
- boolean retVal;
-
- int cnt = 0;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
-
- if (idx != null) {
- checkRemoved(idx);
-
- Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
- for (T item : items) {
- putMap.put(itemKey(idx), item);
+ if (idx != null) {
+ checkRemoved(idx);
- idx++;
- }
+ Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
- cache.putAll(putMap);
+ for (T item : items) {
+ putMap.put(itemKey(idx), item);
- retVal = true;
- }
- else
- retVal = false;
+ idx++;
+ }
- tx.commit();
+ cache.putAll(putMap);
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ retVal = true;
+ }
+ else
+ retVal = false;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException {
try {
- int cnt = 0;
+ retryTopologySafe(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
+ if (idx != null) {
+ checkRemoved(idx);
- if (idx != null) {
- checkRemoved(idx);
+ boolean rmv = cache.remove(itemKey(idx));
- boolean rmv = cache.remove(itemKey(idx));
+ assert rmv : idx;
+ }
- assert rmv : idx;
+ tx.commit();
}
- tx.commit();
-
- break;
+ return null;
}
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ }).call();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
}
}
}