You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/02 17:49:06 UTC

[43/50] incubator-ignite git commit: #ignite-732: IgniteCache.size() should not fail in case of topology changes.

#ignite-732: IgniteCache.size() should not fail in case of topology changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/139aa270
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/139aa270
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/139aa270

Branch: refs/heads/ignite-709_2
Commit: 139aa270ae61494c0757867f2dc531ec7251b1da
Parents: 0885ac0
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Apr 30 18:43:56 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Apr 30 18:43:56 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeTaskAdapter.java      |  14 +-
 .../processors/cache/GridCacheAdapter.java      | 503 ++++++++++---------
 .../processors/cache/GridCacheProcessor.java    | 109 ++--
 3 files changed, 349 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index c2ad198..87081fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,15 +24,16 @@ import java.util.*;
 
 /**
  * Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code GridComputeTaskAdapter} can be used:
+ * how {@code ComputeTaskAdapter} can be used:
  * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, String&gt; {
+ * public class MyFooBarTask extends ComputeTaskAdapter&lt;String, String&gt; {
  *     // Inject load balancer.
  *     &#64;LoadBalancerResource
  *     ComputeLoadBalancer balancer;
  *
  *     // Map jobs to grid nodes.
- *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt; subgrid, String arg) throws IgniteCheckedException {
+ *     public Map&lt;? extends ComputeJob, GridNode&gt; map(List&lt;GridNode&gt; subgrid, String arg)
+ *         throws IgniteCheckedException {
  *         Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, GridNode&gt;(subgrid.size());
  *
  *         // In more complex cases, you can actually do
@@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
      * <p>
      * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}),
      * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance
-     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that
-     * remote node either failed or job execution was rejected before it got a chance to start. In all
+     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
+     * which means that remote node either failed or job execution was rejected before it got a chance to start. In all
      * other cases the exception will be rethrown which will ultimately cause task to fail.
      *
      * @param res Received remote grid executable result.
@@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
      * @throws IgniteException If handling a job result caused an error effectively rejecting
      *      a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method.
      */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd)
+        throws IgniteException {
         IgniteException e = res.getException();
 
         // Try to failover if result is failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..39f19b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -75,6 +75,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Failed result. */
+    private static final Object FAIL = new Integer(-1);
+
     /** clearLocally() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
@@ -882,7 +885,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[]) null);
+        return entrySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
@@ -897,17 +900,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<K> keySet() {
-        return keySet((CacheEntryPredicate[]) null);
+        return keySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[]) null);
+        return primaryKeySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[]) null);
+        return values((CacheEntryPredicate[])null);
     }
 
     /**
@@ -1080,36 +1083,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public void clear() throws IgniteCheckedException {
-        // Clear local cache synchronously.
-        clearLocally();
-
-        clearRemotes(0, new GlobalClearAllCallable(name()));
+        clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
     }
 
     /** {@inheritDoc} */
     @Override public void clear(K key) throws IgniteCheckedException {
-        // Clear local cache synchronously.
-        clearLocally(key);
-
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+            Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
     @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException {
-        // Clear local cache synchronously.
-        clearLocallyAll(keys);
-
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+        clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+            keys));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(K key) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+            Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+            keys));
     }
 
     /**
@@ -1118,19 +1116,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param clearCall Global clear callable object.
      * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes.
      */
-    private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException {
+    private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException {
         try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes =
-                ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
-
             IgniteInternalFuture<Object> fut = null;
 
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+            ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-                fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
-            }
+            fut = new ClearFuture(ctx, clearCall);
 
             if (fut != null)
                 fut.get();
@@ -1149,19 +1141,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        return clearAsync(new GlobalClearAllCallable(name()));
+        return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
     }
 
     /**
      * @param clearCall Global clear callable object.
      * @return Future.
      */
-    private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+    private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
 
         if (!nodes.isEmpty()) {
-            IgniteInternalFuture<Object> fut =
-                ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+            IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall);
 
             return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
                 @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException {
@@ -2117,7 +2108,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {
-                            @Override public EntryProcessor apply(K k) {
+                        @Override public EntryProcessor apply(K k) {
                             return entryProcessor;
                         }
                     });
@@ -2145,7 +2136,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
             @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
+                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
 
                 return tx.invokeAsync(ctx, invokeMap, args);
             }
@@ -2371,7 +2362,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
-                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
             @Override public String toString() {
@@ -2526,7 +2517,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2915,7 +2906,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return (GridCacheReturn) tx.putAllAsync(ctx,
+                return tx.putAllAsync(ctx,
                         F.t(key, newVal),
                         true,
                         null,
@@ -3017,7 +3008,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     ctx.deploy().registerClass(val);
 
                 return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
-                        ctx.equalsValArray(val)).get().success();
+                    ctx.equalsValArray(val)).get().success();
             }
 
             @Override public String toString() {
@@ -3230,10 +3221,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
 
         return txStart(
-                concurrency,
-                isolation,
-                cfg.getDefaultTxTimeout(),
-                0
+            concurrency,
+            isolation,
+            cfg.getDefaultTxTimeout(),
+            0
         );
     }
 
@@ -3576,22 +3567,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes);
-
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() {
-            @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
-
-                int totalSize = 0;
-
-                for (Integer size : res)
-                    totalSize += size;
-
-                return totalSize;
-            }
-        });
+        return new SizeFuture(peekModes, ctx, modes.near);
     }
 
     /** {@inheritDoc} */
@@ -3675,7 +3651,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         return F.iterator(iterator(),
             new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
                 private IgniteCacheExpiryPolicy expiryPlc =
-                        ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+                    ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
 
                 @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
                     CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3909,50 +3885,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -4893,47 +4825,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
-     * Internal callable which performs clear operation on a cache with the given name.
-     */
-    @GridInternal
-    private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable {
-        /** Cache name. */
-        protected String cacheName;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         */
-        protected GlobalClearCallable(String cacheName) {
-            this.cacheName = cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-        }
-    }
-
-    /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends GlobalClearCallable {
+    private static class GlobalClearAllCallable extends TopologyVersionAwareCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4946,24 +4841,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName) {
-            super(cacheName);
+        private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
+        @Override protected Object callLocal() {
             ((IgniteEx)ignite).cachex(cacheName).clearLocally();
 
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
+        }
     }
 
     /**
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable {
+    private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4979,33 +4880,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) {
-            super(cacheName);
+        protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+            super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
-
-            return null;
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
         }
 
         /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            super.writeExternal(out);
-
-            out.writeObject(keys);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            super.readExternal(in);
+        @Override protected Object callLocal() {
+            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
 
-            keys = (Set<K>) in.readObject();
+            return null;
         }
     }
 
@@ -5013,127 +4906,202 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable {
+    private static class GlobalSizeCallable extends TopologyVersionAwareCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        /** Near enable. */
+        private boolean nearEnable;
 
         /**
          * Required by {@link Externalizable}.
          */
-        public SizeCallable() {
+        public GlobalSizeCallable() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
-            this.cacheName = cacheName;
+        private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) {
+            super(cacheName, topVer);
+
             this.peekModes = peekModes;
+            this.nearEnable = nearEnable;
         }
 
         /** {@inheritDoc} */
-        @Override public Integer applyx(Object o) throws IgniteCheckedException {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-            assert cache != null : cacheName;
+        @Override protected Object callLocal() {
+            try {
+                IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
 
-            return cache.localSize(peekModes);
+                return cache == null ? 0 : cache.localSize(peekModes);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+            IgniteClusterEx cluster = ctx.grid().cluster();
 
-            out.writeInt(peekModes.length);
+            ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name());
 
-            for (int i = 0; i < peekModes.length; i++)
-                U.writeEnum(out, peekModes[i]);
+            return grp.nodes();
         }
 
         /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
+        public String toString() {
+            return S.toString(GlobalSizeCallable.class, this);
+        }
+    }
 
-            int len = in.readInt();
+    /**
+     * Cache size future.
+     */
+    private static class SizeFuture extends RetryFuture {
+        /** Size. */
+        private int size = 0;
 
-            peekModes = new CachePeekMode[len];
+        /**
+         * @param peekModes Peek modes.
+         */
+        public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) {
+            super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near));
+        }
 
-            for (int i = 0; i < len; i++)
-                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+        /** {@inheritDoc} */
+        @Override protected void onInit() {
+            size = 0;
         }
 
         /** {@inheritDoc} */
-        public String toString() {
-            return S.toString(SizeCallable.class, this);
+        @Override protected void onLocal(Object localRes) {
+            size += (Integer)localRes;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void allDone() {
+            onDone(size);
         }
     }
 
     /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
+     * Cache clear future.
      */
-    @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
+    private static class ClearFuture extends RetryFuture {
+        /**
+         */
+        public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall) {
+            super(ctx, clearCall);
+        }
 
-        /** Cache name. */
-        private String cacheName;
+        /** {@inheritDoc} */
+        @Override protected void onInit() {
+           // No-op.
+        }
 
-        /** Primary only flag. */
-        private boolean primaryOnly;
+        /** {@inheritDoc} */
+        @Override protected void onLocal(Object localRes) {
+            // No-op.
+        }
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        /** {@inheritDoc} */
+        @Override protected void allDone() {
+            onDone();
+        }
+    }
+
+    /**
+     * Retry future.
+     */
+    protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> {
+        /** Context. */
+        private final GridCacheContext ctx;
+
+        /** Callable. */
+        private final TopologyVersionAwareCallable call;
+
+        /** Max retries count before issuing an error. */
+        private volatile int retries = 32;
 
         /**
-         * Empty constructor for serialization.
          */
-        public GlobalSizeCallable() {
-            // No-op.
+        public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) {
+            this.ctx = ctx;
+            this.call = call;
+
+            init();
         }
 
         /**
-         * @param cacheName Cache name.
-         * @param primaryOnly Primary only flag.
+         * Init.
          */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
-            this.primaryOnly = primaryOnly;
-        }
+        private void init() {
+            Collection<ClusterNode> nodes = call.nodes(ctx);
 
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
+            call.topologyVersion(ctx.affinity().affinityTopologyVersion());
 
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
+            IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST,
+                F.asSet((Callable<Object>)call), nodes, true);
 
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeBoolean(primaryOnly);
-        }
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() {
+                @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) {
+                    try {
+                        Collection<Object> res = fut.get();
 
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-            primaryOnly = in.readBoolean();
+                        onInit();
+
+                        for (Object locRes : res) {
+                            if (locRes == FAIL) {
+                                if (retries-- > 0)
+                                    init();
+                                else {
+                                    onDone(new ClusterTopologyException("Failed to wait topology."));
+
+                                    return;
+                                }
+                            }
+
+                            onLocal(locRes);
+                        }
+
+                        allDone();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (X.hasCause(e, ClusterTopologyException.class)) {
+                            if (retries-- > 0)
+                                init();
+                            else
+                                onDone(e);
+                        }
+                        else
+                            onDone(e);
+                    }
+                }
+            });
         }
+
+        /**
+         * Init reducer.
+         */
+        protected abstract void onInit();
+
+        /**
+         * @param localRes Add local result to global result.
+         */
+        protected abstract void onLocal(Object localRes);
+
+        /**
+         * On done.
+         */
+        protected abstract void allDone();
     }
 
     /**
@@ -5697,4 +5665,89 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
+
+    /**
+     * Delayed callable class.
+     */
+    protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable, Callable<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Affinity topology version. */
+        protected AffinityTopologyVersion topVer;
+
+        /** Cache name. */
+        protected String cacheName;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public TopologyVersionAwareCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param topVer Affinity topology version.
+         */
+        public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            if (!compareTopologyVersions())
+                return FAIL;
+
+            Object res = callLocal();
+
+            if (!compareTopologyVersions())
+                return FAIL;
+            else
+                return res;
+        }
+
+        /**
+         * Call local.
+         *
+         * @return Local result.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected abstract Object callLocal() throws IgniteCheckedException;
+
+        /**
+         * @param ctx Grid cache context.
+         * @return Nodes to call.
+         */
+        protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx);
+
+        /**
+         * Compare topology versions.
+         */
+        public boolean compareTopologyVersions() {
+            GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache();
+
+            GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName);
+
+            if (cacheAdapter == null)
+                return false;
+
+            final GridCacheContext<K, V> ctx = cacheAdapter.context();
+
+            AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion();
+
+            return locTopVer.compareTo(topVer) == 0;
+        }
+
+        /**
+         * @param topVer Affinity topology version.
+         */
+        public void topologyVersion(AffinityTopologyVersion topVer) {
+            this.topVer = topVer;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..77fa104 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Must use JDK marshaller since it is used by discovery to fire custom events. */
     private Marshaller marshaller = new JdkMarshaller();
 
+    /** Count down latch for caches. */
+    private CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
     /**
      * @param ctx Kernal context.
      */
@@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
+        try {
+            if (ctx.config().isDaemon())
+                return;
 
-        ClusterNode locNode = ctx.discovery().localNode();
+            ClusterNode locNode = ctx.discovery().localNode();
 
-        // Init cache plugin managers.
-        final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+            // Init cache plugin managers.
+            final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
 
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            CacheConfiguration locCcfg = desc.cacheConfiguration();
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                CacheConfiguration locCcfg = desc.cacheConfiguration();
 
-            CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+                CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
 
-            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-        }
+                cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+            }
 
-        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                checkTransactionConfiguration(n);
+            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+                for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    checkTransactionConfiguration(n);
 
-                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+                    DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+                    DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
-                    locDepMode, rmtDepMode, true);
+                    CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+                        locDepMode, rmtDepMode, true);
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 
-                    if (rmtCfg != null) {
-                        CacheConfiguration locCfg = desc.cacheConfiguration();
+                        if (rmtCfg != null) {
+                            CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                        checkCache(locCfg, rmtCfg, n);
+                            checkCache(locCfg, rmtCfg, n);
 
-                        // Check plugin cache configurations.
-                        CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+                            // Check plugin cache configurations.
+                            CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
 
-                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                            assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                        pluginMgr.validateRemotes(rmtCfg, n);
+                            pluginMgr.validateRemotes(rmtCfg, n);
+                        }
                     }
                 }
             }
-        }
 
-        // Start dynamic caches received from collect discovery data.
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            boolean started = desc.onStart();
+            // Start dynamic caches received from collect discovery data.
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                boolean started = desc.onStart();
 
-            assert started : "Failed to change started flag for locally configured cache: " + desc;
+                assert started : "Failed to change started flag for locally configured cache: " + desc;
 
-            desc.clearRemoteConfigurations();
+                desc.clearRemoteConfigurations();
 
-            CacheConfiguration ccfg = desc.cacheConfiguration();
+                CacheConfiguration ccfg = desc.cacheConfiguration();
 
-            IgnitePredicate filter = ccfg.getNodeFilter();
+                IgnitePredicate filter = ccfg.getNodeFilter();
 
-            if (filter.apply(locNode)) {
-                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+                if (filter.apply(locNode)) {
+                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-                CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+                    CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
 
-                assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                    assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
 
-                ctx.dynamicDeploymentId(desc.deploymentId());
+                    ctx.dynamicDeploymentId(desc.deploymentId());
 
-                sharedCtx.addCacheContext(ctx);
+                    sharedCtx.addCacheContext(ctx);
 
-                GridCacheAdapter cache = ctx.cache();
+                    GridCacheAdapter cache = ctx.cache();
 
-                String name = ccfg.getName();
+                    String name = ccfg.getName();
 
-                caches.put(maskNull(name), cache);
+                    caches.put(maskNull(name), cache);
 
-                startCache(cache);
+                    startCache(cache);
 
-                jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                }
             }
         }
+        finally {
+            cacheStartedLatch.countDown();
+        }
 
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
@@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStop(boolean cancel) {
+        cacheStartedLatch.countDown();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -2686,6 +2696,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting internal cache adapter: " + name);
 
+        try {
+            cacheStartedLatch.await();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteException("Failed to wait starting caches.");
+        }
+
         return (GridCacheAdapter<K, V>)caches.get(maskNull(name));
     }