You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/12/11 16:56:05 UTC

[37/59] [abbrv] ignite git commit: ignite-1.5 Fix for transaction retry logic in DataStructuresProcessor. Fixed CacheObjectBinaryProcessorImpl.meta for client nodes to try get meta from cache if local value not found.

ignite-1.5 Fix for transaction retry logic in DataStructuresProcessor.  Fixed CacheObjectBinaryProcessorImpl.meta for client nodes to try get meta from cache if local value not found.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e0ef348
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e0ef348
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e0ef348

Branch: refs/heads/ignite-843-rc2
Commit: 6e0ef3480ed7d2df33aea6c23dff69c0cd957306
Parents: f6555da
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 11 14:55:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 11 14:55:27 2015 +0300

----------------------------------------------------------------------
 .../binary/CacheObjectBinaryProcessorImpl.java  |  12 +-
 .../datastructures/DataStructuresProcessor.java | 161 +++++++++----------
 2 files changed, 89 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e0ef348/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 9ae8a62..12e7078 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -499,8 +499,16 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** {@inheritDoc} */
     @Nullable @Override public BinaryType metadata(final int typeId) throws BinaryObjectException {
         try {
-            if (clientNode)
-                return clientMetaDataCache.get(typeId);
+            if (clientNode) {
+                BinaryType typeMeta = clientMetaDataCache.get(typeId);
+
+                if (typeMeta != null)
+                    return typeMeta;
+
+                BinaryMetadata meta = metaDataCache.getTopologySafe(new PortableMetadataKey(typeId));
+
+                return meta != null ? meta.wrap(portableCtx) : null;
+            }
             else {
                 PortableMetadataKey key = new PortableMetadataKey(typeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e0ef348/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 51c4067..cd783e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -57,15 +57,16 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -504,8 +505,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c,
-        DataStructureInfo dsInfo,
-        boolean create,
+        final DataStructureInfo dsInfo,
+        final boolean create,
         Class<? extends T> cls)
         throws IgniteCheckedException
     {
@@ -527,39 +528,26 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         if (dataStructure != null)
             return dataStructure;
 
-        while (true) {
-            try {
+        return retryTopologySafe(new IgniteOutClosureX<T>() {
+            @Override public T applyx() throws IgniteCheckedException {
                 if (!create)
                     return c.applyx();
 
                 try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+                    IgniteCheckedException err =
+                        utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
 
                     if (err != null)
                         throw err;
 
-                    dataStructure = c.applyx();
+                    T dataStructure = c.applyx();
 
                     tx.commit();
 
                     return dataStructure;
                 }
             }
-            catch (IgniteTxRollbackCheckedException ignore) {
-                // Safe to retry right away.
-            }
-            catch (IgniteCheckedException e) {
-                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
-                if (topErr == null)
-                    throw e;
-
-                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
-
-                if (fut != null)
-                    fut.get();
-            }
-        }
+        });
     }
 
     /**
@@ -597,10 +585,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @param afterRmv Optional closure to run after data structure removed.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> void removeDataStructure(IgniteOutClosureX<T> c,
+    private <T> void removeDataStructure(final IgniteOutClosureX<T> c,
         String name,
         DataStructureType type,
-        @Nullable IgniteInClosureX<T> afterRmv)
+        @Nullable final IgniteInClosureX<T> afterRmv)
         throws IgniteCheckedException
     {
         Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
@@ -608,52 +596,42 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         if (dsMap == null || !dsMap.containsKey(name))
             return;
 
-        DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
+        final DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
 
         IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false);
 
         if (err != null)
             throw err;
 
-        while (true) {
-            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                T2<Boolean, IgniteCheckedException> res =
-                    utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
-
-                err = res.get2();
+        retryTopologySafe(new IgniteOutClosureX<Void>() {
+            @Override public Void applyx() throws IgniteCheckedException {
+                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    T2<Boolean, IgniteCheckedException> res =
+                        utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
 
-                if (err != null)
-                    throw err;
+                    IgniteCheckedException err = res.get2();
 
-                assert res.get1() != null;
+                    if (err != null)
+                        throw err;
 
-                boolean exists = res.get1();
+                    assert res.get1() != null;
 
-                if (!exists)
-                    return;
+                    boolean exists = res.get1();
 
-                T rmvInfo = c.applyx();
+                    if (!exists)
+                        return null;
 
-                tx.commit();
+                    T rmvInfo = c.applyx();
 
-                if (afterRmv != null && rmvInfo != null)
-                    afterRmv.applyx(rmvInfo);
-            }
-            catch (IgniteTxRollbackCheckedException ignore) {
-                // Safe to retry right away.
-            }
-            catch (IgniteCheckedException e) {
-                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
-                if (topErr == null)
-                    throw e;
+                    tx.commit();
 
-                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+                    if (afterRmv != null && rmvInfo != null)
+                        afterRmv.applyx(rmvInfo);
 
-                if (fut != null)
-                    fut.get();
+                    return null;
+                }
             }
-        }
+        });
     }
 
     /**
@@ -1000,7 +978,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
-        DataStructureInfo dsInfo,
+        final DataStructureInfo dsInfo,
         boolean create)
         throws IgniteCheckedException
     {
@@ -1028,41 +1006,29 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             return c.applyx(cacheCtx);
         }
 
-        while (true) {
-            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                T2<String, IgniteCheckedException> res =
-                    utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
-
-                err = res.get2();
-
-                if (err != null)
-                    throw err;
-
-                String cacheName = res.get1();
+        return retryTopologySafe(new IgniteOutClosureX<T>() {
+            @Override public T applyx() throws IgniteCheckedException {
+                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    T2<String, IgniteCheckedException> res =
+                        utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
 
-                final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+                    IgniteCheckedException err = res.get2();
 
-                T col = c.applyx(cacheCtx);
+                    if (err != null)
+                        throw err;
 
-                tx.commit();
+                    String cacheName = res.get1();
 
-                return col;
-            }
-            catch (IgniteTxRollbackCheckedException ignore) {
-                // Safe to retry right away.
-            }
-            catch (IgniteCheckedException e) {
-                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+                    final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
 
-                if (topErr == null)
-                    throw e;
+                    T col = c.applyx(cacheCtx);
 
-                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+                    tx.commit();
 
-                if (fut != null)
-                    fut.get();
+                    return col;
+                }
             }
-        }
+        });
     }
 
     /**
@@ -1659,6 +1625,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param c Closure to run.
+     * @throws IgniteCheckedException If failed.
+     * @return Closure return value.
+     */
+    private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException {
+        for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
+            try {
+                return c.applyx();
+            }
+            catch (IgniteCheckedException e) {
+                if (i == GridCacheAdapter.MAX_RETRIES - 1)
+                    throw e;
+
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException))
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                if (fut != null)
+                    fut.get();
+            }
+        }
+
+        assert false;
+
+        return null;
+    }
+
+    /**
      *
      */
     enum DataStructureType {