You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/11 09:00:17 UTC
[15/16] ignite git commit: ignite-1.5 Fix for transaction retry logic
in DataStructuresProcessor.
ignite-1.5 Fix for transaction retry logic in DataStructuresProcessor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/847bd424
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/847bd424
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/847bd424
Branch: refs/heads/ignite-1537
Commit: 847bd424e098875c00bf5d3f8d42cf40b6e2dd52
Parents: a88d81a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 11 10:53:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 11 10:53:57 2015 +0300
----------------------------------------------------------------------
.../datastructures/DataStructuresProcessor.java | 161 +++++++++----------
1 file changed, 79 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/847bd424/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 51c4067..cd783e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -57,15 +57,16 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -504,8 +505,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
@Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c,
- DataStructureInfo dsInfo,
- boolean create,
+ final DataStructureInfo dsInfo,
+ final boolean create,
Class<? extends T> cls)
throws IgniteCheckedException
{
@@ -527,39 +528,26 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (dataStructure != null)
return dataStructure;
- while (true) {
- try {
+ return retryTopologySafe(new IgniteOutClosureX<T>() {
+ @Override public T applyx() throws IgniteCheckedException {
if (!create)
return c.applyx();
try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ IgniteCheckedException err =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
if (err != null)
throw err;
- dataStructure = c.applyx();
+ T dataStructure = c.applyx();
tx.commit();
return dataStructure;
}
}
- catch (IgniteTxRollbackCheckedException ignore) {
- // Safe to retry right away.
- }
- catch (IgniteCheckedException e) {
- ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
- if (topErr == null)
- throw e;
-
- IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
-
- if (fut != null)
- fut.get();
- }
- }
+ });
}
/**
@@ -597,10 +585,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @param afterRmv Optional closure to run after data structure removed.
* @throws IgniteCheckedException If failed.
*/
- private <T> void removeDataStructure(IgniteOutClosureX<T> c,
+ private <T> void removeDataStructure(final IgniteOutClosureX<T> c,
String name,
DataStructureType type,
- @Nullable IgniteInClosureX<T> afterRmv)
+ @Nullable final IgniteInClosureX<T> afterRmv)
throws IgniteCheckedException
{
Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
@@ -608,52 +596,42 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (dsMap == null || !dsMap.containsKey(name))
return;
- DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
+ final DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false);
if (err != null)
throw err;
- while (true) {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<Boolean, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
-
- err = res.get2();
+ retryTopologySafe(new IgniteOutClosureX<Void>() {
+ @Override public Void applyx() throws IgniteCheckedException {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<Boolean, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
- if (err != null)
- throw err;
+ IgniteCheckedException err = res.get2();
- assert res.get1() != null;
+ if (err != null)
+ throw err;
- boolean exists = res.get1();
+ assert res.get1() != null;
- if (!exists)
- return;
+ boolean exists = res.get1();
- T rmvInfo = c.applyx();
+ if (!exists)
+ return null;
- tx.commit();
+ T rmvInfo = c.applyx();
- if (afterRmv != null && rmvInfo != null)
- afterRmv.applyx(rmvInfo);
- }
- catch (IgniteTxRollbackCheckedException ignore) {
- // Safe to retry right away.
- }
- catch (IgniteCheckedException e) {
- ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
- if (topErr == null)
- throw e;
+ tx.commit();
- IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+ if (afterRmv != null && rmvInfo != null)
+ afterRmv.applyx(rmvInfo);
- if (fut != null)
- fut.get();
+ return null;
+ }
}
- }
+ });
}
/**
@@ -1000,7 +978,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
@Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
- DataStructureInfo dsInfo,
+ final DataStructureInfo dsInfo,
boolean create)
throws IgniteCheckedException
{
@@ -1028,41 +1006,29 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return c.applyx(cacheCtx);
}
- while (true) {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<String, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
-
- err = res.get2();
-
- if (err != null)
- throw err;
-
- String cacheName = res.get1();
+ return retryTopologySafe(new IgniteOutClosureX<T>() {
+ @Override public T applyx() throws IgniteCheckedException {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<String, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
- final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+ IgniteCheckedException err = res.get2();
- T col = c.applyx(cacheCtx);
+ if (err != null)
+ throw err;
- tx.commit();
+ String cacheName = res.get1();
- return col;
- }
- catch (IgniteTxRollbackCheckedException ignore) {
- // Safe to retry right away.
- }
- catch (IgniteCheckedException e) {
- ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+ final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
- if (topErr == null)
- throw e;
+ T col = c.applyx(cacheCtx);
- IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+ tx.commit();
- if (fut != null)
- fut.get();
+ return col;
+ }
}
- }
+ });
}
/**
@@ -1659,6 +1625,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
+ * @param c Closure to run.
+ * @throws IgniteCheckedException If failed.
+ * @return Closure return value.
+ */
+ private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException {
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
+ try {
+ return c.applyx();
+ }
+ catch (IgniteCheckedException e) {
+ if (i == GridCacheAdapter.MAX_RETRIES - 1)
+ throw e;
+
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException))
+ throw e;
+
+ IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+ if (fut != null)
+ fut.get();
+ }
+ }
+
+ assert false;
+
+ return null;
+ }
+
+ /**
*
*/
enum DataStructureType {