You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/04 04:33:55 UTC

[49/50] incubator-ignite git commit: IGNITE-45 - WIP

IGNITE-45 - WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30d96ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30d96ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30d96ad5

Branch: refs/heads/ignite-45
Commit: 30d96ad594b2d9372a400847068e803bd3c9631a
Parents: 55a9c50
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Mar 3 19:21:08 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Mar 3 19:21:08 2015 -0800

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 66 +++++++++++++++++---
 .../affinity/AffinityTopologyVersion.java       | 23 ++++++-
 .../affinity/GridAffinityAssignmentCache.java   |  6 ++
 .../cache/DynamicCacheDescriptor.java           | 28 ++++++---
 .../GridCachePartitionExchangeManager.java      | 54 ++++++++++------
 .../processors/cache/GridCacheProcessor.java    | 36 ++++++++---
 .../cache/GridCacheSharedContext.java           |  7 +++
 .../dht/GridDhtPartitionTopologyImpl.java       |  2 +-
 .../preloader/GridDhtPartitionExchangeId.java   |  5 +-
 .../GridDhtPartitionsExchangeFuture.java        | 34 +++++++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  1 +
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 13 ++++
 12 files changed, 219 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index dce04e2..d891149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -166,6 +166,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** Custom event listener. */
     private GridPlainInClosure<Serializable> customEvtLsnr;
 
+    /** Map of dynamic cache filters. */
+    private Map<String, IgnitePredicate<ClusterNode>> dynamicCacheFilters = new HashMap<>();
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -214,6 +217,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         getSpi().setNodeAttributes(attrs, ver);
     }
 
+    /**
+     * Adds dynamic cache filters.
+     *
+     * @param cacheName Cache name.
+     * @param filter Cache filter.
+     */
+    public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) {
+        IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter);
+
+        assert old == null;
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         discoOrdered = discoOrdered();
@@ -277,10 +292,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         c.updateAlives(node);
                 }
 
+                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                    try {
+                        customEvtLsnr.apply(data);
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to notify direct custom event listener: " + data, e);
+                    }
+                }
+
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.
-                if (type != EVT_NODE_METRICS_UPDATED && type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                if (type != EVT_NODE_METRICS_UPDATED) {
                     DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
 
                     discoCacheHist.put(topVer, cache);
@@ -307,15 +331,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     return;
                 }
 
-                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
-                    try {
-                        customEvtLsnr.apply(data);
-                    }
-                    catch (Exception e) {
-                        U.error(log, "Failed to notify direct custom event listener: " + data, e);
-                    }
-                }
-
                 if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) {
                     boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer);
 
@@ -1834,6 +1849,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                 GridCacheAttributes[] caches = node.attribute(ATTR_CACHE);
 
+                boolean hasCaches = false;
+
                 if (caches != null) {
                     nodesWithCaches.add(node);
 
@@ -1860,6 +1877,35 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         }
                     }
 
+                    hasCaches = true;
+                }
+
+                for (Map.Entry<String, IgnitePredicate<ClusterNode>> entry : dynamicCacheFilters.entrySet()) {
+                    String cacheName = entry.getKey();
+                    IgnitePredicate<ClusterNode> filter = entry.getValue();
+
+                    if (filter.apply(node)) {
+                        addToMap(cacheMap, cacheName, node);
+
+                        if (alive(node.id()))
+                            addToMap(aliveCacheNodes, maskNull(cacheName), node);
+
+                        addToMap(dhtNodesMap, cacheName, node);
+
+                        // TODO IGNITE-45 client and near caches.
+
+                        if (!loc.id().equals(node.id())) {
+                            addToMap(rmtCacheMap, cacheName, node);
+
+                            if (alive(node.id()))
+                                addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
+                        }
+
+                        hasCaches = true;
+                    }
+                }
+
+                if (hasCaches) {
                     if (alive(node.id())) {
                         aliveNodesWithCaches.add(node);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index e276253..cb24ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -89,17 +89,34 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
 
     /** {@inheritDoc} */
     @Override public int compareTo(AffinityTopologyVersion o) {
-        return Long.compare(topVer, o.topVer);
+        int cmp = Long.compare(topVer, o.topVer);
+
+        if (cmp == 0)
+            return Integer.compare(minorTopVer, o.minorTopVer);
+
+        return cmp;
     }
 
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
-        return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer;
+        if (this == o)
+            return true;
+
+        if (!(o instanceof AffinityTopologyVersion))
+            return false;
+
+        AffinityTopologyVersion that = (AffinityTopologyVersion)o;
+
+        return minorTopVer == that.minorTopVer && topVer == that.topVer;
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        return (int)topVer;
+        int result = (int)(topVer ^ (topVer >>> 32));
+
+        result = 31 * result + minorTopVer;
+
+        return result;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index b964f83..4bb5885 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -150,6 +150,8 @@ public class GridAffinityAssignmentCache {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
+        U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
+            ", discoEvt=" + discoEvt + ']');
 
         GridAffinityAssignment prev = affCache.get(topVer.previous());
 
@@ -162,6 +164,8 @@ public class GridAffinityAssignmentCache {
             // Resolve nodes snapshot for specified topology version.
             Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion());
 
+            U.debug(log, "Affinity nodes: " + nodes);
+
             sorted = sort(nodes);
         }
 
@@ -187,6 +191,8 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
 
+        U.debug(log, "Updated assignment: " + updated);
+
         updated = F.addIfAbsent(affCache, topVer, updated);
 
         // Update top version, if required.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 196730c..6a6e227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -28,28 +29,35 @@ import java.io.*;
  * Cache start descriptor.
  */
 public class DynamicCacheDescriptor implements Serializable {
+    /** Cache start ID. */
+    private IgniteUuid startId;
+
     /** Cache configuration. */
     @GridToStringExclude
     private CacheConfiguration cacheCfg;
 
     /** Deploy filter bytes. */
     @GridToStringExclude
-    private byte[] deployFltrBytes;
-
-    /** Cache start ID. */
-    private IgniteUuid startId;
+    private IgnitePredicate<ClusterNode> nodeFilter;
 
     /**
      * @param cacheCfg Cache configuration.
-     * @param deployFltrBytes Deployment filter bytes.
+     * @param nodeFilter Node filter.
      */
-    public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) {
+    public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) {
         this.cacheCfg = cacheCfg;
-        this.deployFltrBytes = deployFltrBytes;
+        this.nodeFilter = nodeFilter;
         this.startId = startId;
     }
 
     /**
+     * @return Start ID.
+     */
+    public IgniteUuid startId() {
+        return startId;
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration cacheConfiguration() {
@@ -57,10 +65,10 @@ public class DynamicCacheDescriptor implements Serializable {
     }
 
     /**
-     * @return Start ID.
+     * @return Node filter.
      */
-    public IgniteUuid startId() {
-        return startId;
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return nodeFilter;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 76ecea4..d38161e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.events.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -114,6 +115,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 final ClusterNode n = e.eventNode();
 
+                GridDhtPartitionExchangeId exchId = null;
+                GridDhtPartitionsExchangeFuture<K, V> exchFut = null;
+
                 if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
                     assert !loc.id().equals(n.id());
 
@@ -129,12 +133,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         "Node joined with smaller-than-local " +
                             "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 
-                    GridDhtPartitionExchangeId exchId = exchangeId(n.id(),
+                    exchId = exchangeId(n.id(),
                         new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0),
                         e.type());
 
-                    GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e);
+                    exchFut = exchangeFuture(exchId, e, null);
+                }
+                else {
+                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+
+                    if (customEvt.data() instanceof DynamicCacheDescriptor) {
+                        DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data();
+
+                        // Check if this event should trigger partition exchange.
+                        if (cctx.cache().dynamicCacheRegistered(desc)) {
+                            exchId = exchangeId(n.id(),
+                                new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer),
+                                e.type());
+
+                            exchFut = exchangeFuture(exchId, e, desc);
+                        }
+                    }
+                }
 
+                if (exchId != null) {
                     // Start exchange process.
                     pendingExchangeFuts.add(exchFut);
 
@@ -161,9 +183,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         }
                     });
                 }
-                else {
-                    // TODO.
-                }
             }
             finally {
                 leaveBusy();
@@ -225,7 +244,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
 
-        GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt);
+        GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt, null);
 
         new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
 
@@ -399,16 +418,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * Callback to start exchange for dynamically started cache.
-     *
-     * @param cacheDesc Cache descriptor.
-     */
-    public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) {
-        // TODO IGNITE-45 move to exchange future.
-        cctx.kernalContext().cache().onCacheStartFinished(cacheDesc);
-    }
-
-    /**
      * @return {@code True} if topology has changed.
      */
     public boolean topologyChanged() {
@@ -579,11 +588,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId,
-        @Nullable DiscoveryEvent discoEvt) {
+        @Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) {
         GridDhtPartitionsExchangeFuture<K, V> fut;
 
         GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId));
+            fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc));
 
         if (old != null)
             fut = old;
@@ -606,6 +615,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     fut.cleanUp();
             }
         }
+
+        DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor();
+
+        if (desc != null)
+            cctx.cache().onCacheStartFinished(desc);
     }
 
     /**
@@ -654,7 +668,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     refreshPartitions();
             }
             else
-                exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
+                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
         }
         finally {
             leaveBusy();
@@ -692,7 +706,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     scheduleResendPartitions();
             }
             else
-                exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
+                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
         }
         finally {
             leaveBusy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 069930e..a22c9a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1170,6 +1170,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param desc Descriptor to check.
+     * @return {@code True} if cache was registered for start and exchange future should be created.
+     */
+    public boolean dynamicCacheRegistered(DynamicCacheDescriptor desc) {
+        return dynamicCaches.get(desc.cacheConfiguration().getName()) == desc;
+    }
+
+    /**
+     * @param startDesc Start descriptor.
+     */
+    public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws IgniteCheckedException {
+        CacheConfiguration cfg = new CacheConfiguration(startDesc.cacheConfiguration());
+
+        initialize(cfg);
+
+        GridCacheContext cacheCtx = createCache(cfg);
+
+        sharedCtx.addCacheContext(cacheCtx);
+
+        startCache(cacheCtx.cache());
+        onKernalStart(cacheCtx.cache());
+    }
+
+    /**
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
      * @param startDesc Cache start descriptor.
@@ -1256,8 +1280,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId()));
 
         try {
-            byte[] filterBytes = ctx.config().getMarshaller().marshal(nodeFilter);
-
             for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) {
                 if (ccfg0.getName().equals(ccfg.getName()))
                     return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
@@ -1274,11 +1296,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
                     "(a cache with the same name is already started): " + ccfg.getName()));
 
-            ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, filterBytes, fut.startId()));
+            ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, nodeFilter, fut.startId()));
 
             return fut;
         }
-        catch (IgniteCheckedException e) {
+        catch (Exception e) {
             fut.onDone(e);
 
             // Safety.
@@ -1315,11 +1337,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc);
 
-        assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
-
-        // TODO IGNITE-45 create cache context here.
+        ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter());
 
-        sharedCtx.exchange().onCacheDeployed(startDesc);
+        assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index e133a17..aadb153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -115,6 +115,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Cache processor.
+     */
+    public GridCacheProcessor cache() {
+        return kernalCtx.cache();
+    }
+
+    /**
      * Adds cache context to shared cache context.
      *
      * @param cacheCtx Cache context to add.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4af7534..e86996d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -209,7 +209,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
             assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
                 topVer + ", exchId=" + exchId + ']';
 
-            if (!exchId.isJoined())
+            if (exchId.isLeft())
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index d101efd..1145bdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import org.apache.ignite.internal.events.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -28,6 +29,7 @@ import java.nio.*;
 import java.util.*;
 
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
 /**
  * Exchange ID.
@@ -54,7 +56,8 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
      */
     public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) {
         assert nodeId != null;
-        assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED;
+        assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED ||
+            evt == EVT_DISCOVERY_CUSTOM_EVT;
         assert topVer.topologyVersion() > 0;
 
         this.nodeId = nodeId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 81ab4bf..de87904 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -142,6 +142,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
     /** Logger. */
     private IgniteLogger log;
 
+    /** Dynamic cache start descriptor. */
+    private DynamicCacheDescriptor startDesc;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -197,8 +200,12 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
      */
-    public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, ReadWriteLock busyLock,
-        GridDhtPartitionExchangeId exchId) {
+    public GridDhtPartitionsExchangeFuture(
+        GridCacheSharedContext<K, V> cctx,
+        ReadWriteLock busyLock,
+        GridDhtPartitionExchangeId exchId,
+        DynamicCacheDescriptor startDesc
+    ) {
         super(cctx.kernalContext());
 
         syncNotify(true);
@@ -213,6 +220,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
         this.cctx = cctx;
         this.busyLock = busyLock;
         this.exchId = exchId;
+        this.startDesc = startDesc;
 
         log = cctx.logger(getClass());
 
@@ -379,6 +387,13 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
     }
 
     /**
+     * @return Dynamic cache descriptor.
+     */
+    public DynamicCacheDescriptor dynamicCacheDescriptor() {
+        return startDesc;
+    }
+
+    /**
      * @return Init future.
      */
     IgniteInternalFuture<?> initFuture() {
@@ -422,6 +437,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
+                if (startDesc != null)
+                    startCache();
+
                 assert discoEvt != null;
 
                 assert exchId.nodeId().equals(discoEvt.eventNode().id());
@@ -433,7 +451,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
                 }
 
                 // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion().topologyVersion()));
+                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+                    exchId.topologyVersion().topologyVersion()));
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
 
@@ -547,6 +566,15 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
     }
 
     /**
+     * Starts dynamic cache.
+     */
+    private void startCache() throws IgniteCheckedException {
+        assert startDesc != null;
+
+        ctx.cache().onCacheStartExchange(startDesc);
+    }
+
+    /**
      * @param node Node.
      * @param id ID.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 34995ba..81b128e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4481,6 +4481,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 }
 
                 msg.verify(getLocalNodeId());
+                msg.topologyVersion(ring.topologyVersion());
             }
 
             if (msg.verified()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 5d515e3..efb5db2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -136,4 +136,17 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
         assertEquals(1, succeeded);
         assertEquals(threadNum - 1, failed);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheSimple() throws Exception {
+        final IgniteKernal kernal = (IgniteKernal)grid(0);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName("TestCacheName3");
+
+        kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get();
+    }
 }