You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/25 14:01:29 UTC

[08/14] ignite git commit: IGNITE-6667 wip.

IGNITE-6667 wip.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c911b590
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c911b590
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c911b590

Branch: refs/heads/ignite-6667
Commit: c911b590c9e7d5143ff60f68753e54392ea1e86f
Parents: 44c4c57
Author: ascherbakoff <al...@gmail.com>
Authored: Tue Oct 24 01:37:37 2017 +0300
Committer: ascherbakoff <al...@gmail.com>
Committed: Tue Oct 24 01:37:37 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |  26 +---
 .../discovery/DiscoveryCustomMessage.java       |  12 +-
 .../discovery/GridDiscoveryManager.java         | 141 ++++++++++++++-----
 .../discovery/ReuseDiscoCacheStrategy.java      |  65 +++++++++
 .../cache/CacheAffinityChangeMessage.java       |  16 +++
 .../ClientCacheChangeDiscoveryMessage.java      |  17 +++
 .../ClientCacheChangeDummyDiscoveryMessage.java |  17 +++
 .../cache/DynamicCacheChangeBatch.java          |  19 ++-
 .../binary/MetadataUpdateAcceptedMessage.java   |  16 +++
 .../binary/MetadataUpdateProposedMessage.java   |  16 +++
 .../cluster/ChangeGlobalStateFinishMessage.java |  17 +++
 .../cluster/ChangeGlobalStateMessage.java       |  19 ++-
 .../continuous/AbstractContinuousMessage.java   |  18 +++
 .../StartRoutineAckDiscoveryMessage.java        |   2 +-
 .../StartRoutineDiscoveryMessage.java           |   2 +-
 .../StopRoutineAckDiscoveryMessage.java         |   2 +-
 .../continuous/StopRoutineDiscoveryMessage.java |   2 +-
 .../marshaller/MappingAcceptedMessage.java      |  16 +++
 .../marshaller/MappingProposedMessage.java      |  17 +++
 .../message/SchemaAbstractDiscoveryMessage.java |  17 +++
 .../message/SchemaProposeDiscoveryMessage.java  |   3 +-
 21 files changed, 393 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 7206223..0578c57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -71,10 +71,10 @@ public class DiscoCache {
     private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes;
 
     /** Node map. */
-    private final Map<UUID, ClusterNode> nodeMap;
+    final Map<UUID, ClusterNode> nodeMap;
 
     /** Alive nodes. */
-    private final Set<UUID> alives = new GridConcurrentHashSet<>();
+    final Set<UUID> alives = new GridConcurrentHashSet<>();
 
     /** */
     private final IgniteProductVersion minNodeVer;
@@ -109,7 +109,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives,
-        @Nullable IgniteProductVersion minVer) {
+        IgniteProductVersion minVer) {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
@@ -122,18 +122,6 @@ public class DiscoCache {
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
-
-        if (minVer == null) {
-            for (int i = 0; i < allNodes.size(); i++) {
-                ClusterNode node = allNodes.get(i);
-
-                if (minVer == null)
-                    minVer = node.version();
-                else if (node.version().compareTo(minVer) < 0)
-                    minVer = node.version();
-            }
-        }
-
         minNodeVer = minVer;
     }
 
@@ -333,10 +321,10 @@ public class DiscoCache {
      * @param ver Version.
      * @return Copy.
      */
-    public DiscoCache copy(AffinityTopologyVersion ver) {
+    public DiscoCache copy(@Nullable AffinityTopologyVersion ver, @Nullable DiscoveryDataClusterState st) {
         return new DiscoCache(
-            ver,
-            state,
+            ver == null ? topVer : ver,
+            st == null ? state : st,
             loc,
             rmtNodes,
             allNodes,
@@ -354,4 +342,4 @@ public class DiscoCache {
     @Override public String toString() {
         return S.toString(DiscoCache.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index f908b59..1101695 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import java.io.Serializable;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
@@ -89,4 +90,13 @@ public interface DiscoveryCustomMessage extends Serializable {
      * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
-}
\ No newline at end of file
+
+    /**
+     * Creates new discovery cache in most efficient way for current message type on topology change.
+     * @param stgy Current cache.
+     * @param topVer New topology version.
+     * @param discoCache @return New cache or null if message always reuses current cache.
+     */
+    public @Nullable DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy, AffinityTopologyVersion topVer,
+        DiscoCache discoCache);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 61be4c2..a6e9e56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -103,6 +105,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
@@ -158,7 +161,7 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP;
 /**
  * Discovery SPI manager.
  */
-public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
+public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> implements ReuseDiscoCacheStrategy {
     /** Metrics update frequency. */
     private static final long METRICS_UPDATE_FREQ = 3000;
 
@@ -600,7 +603,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 if (snapshots != null)
                     topHist = snapshots;
 
-                boolean verChanged, incMinorTopVer = false, preventReuse = false;
+                boolean verChanged, incMinorTopVer = false;
 
                 if (type == EVT_NODE_METRICS_UPDATED)
                     verChanged = false;
@@ -655,8 +658,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                             new AffinityTopologyVersion(topVer, minorTopVer),
                             (ChangeGlobalStateMessage)customMsg,
                             discoCache());
-
-                        preventReuse = true;
                     }
                     else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
                         ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
@@ -673,8 +674,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                             customMsg,
                             new AffinityTopologyVersion(topVer, minorTopVer),
                             node);
-
-                        preventReuse = customMsg instanceof DynamicCacheChangeBatch;
                     }
 
                     if (incMinorTopVer) {
@@ -712,11 +711,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
                     if (discoCache == null) {
-                        discoCache = incMinorTopVer && !preventReuse ? snapshot.discoCache.copy(nextTopVer) :
+                        discoCache = incMinorTopVer ?
+                            customMsg.reuseDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache) :
                             createDiscoCache(nextTopVer,
-                                 ctx.state().clusterState(),
-                                 locNode,
-                                 topSnapshot);
+                                ctx.state().clusterState(),
+                                locNode,
+                                topSnapshot);
                     }
 
                     discoCacheHist.put(nextTopVer, discoCache);
@@ -2209,7 +2209,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(
+    @NotNull DiscoCache createDiscoCache(
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
@@ -2248,38 +2248,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
-
         Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 
-        for (ClusterNode node : allNodes) {
-            assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
-            assert !node.isDaemon();
+        fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches);
 
-            for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) {
-                CacheGroupAffinity grpAff = e.getValue();
-                Integer grpId = e.getKey();
-
-                if (CU.affinityNode(node, grpAff.cacheFilter)) {
-                    List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);
+        IgniteProductVersion minVer = null;
 
-                    if (nodes == null)
-                        cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>());
-
-                    nodes.add(node);
-                }
-            }
-
-            for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
-                String cacheName = entry.getKey();
-                CachePredicate filter = entry.getValue();
+        for (int i = 0; i < allNodes.size(); i++) {
+            ClusterNode node = allNodes.get(i);
 
-                if (filter.cacheNode(node)) {
-                    if (!node.isLocal())
-                        rmtNodesWithCaches.add(node);
-
-                    addToMap(allCacheNodes, cacheName, node);
-                }
-            }
+            if (minVer == null)
+                minVer = node.version();
+            else if (node.version().compareTo(minVer) < 0)
+                minVer = node.version();
         }
 
         return new DiscoCache(
@@ -2295,7 +2276,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             Collections.unmodifiableMap(cacheGrpAffNodes),
             Collections.unmodifiableMap(nodeMap),
             alives,
-            null);
+            minVer);
     }
 
     /**
@@ -3047,4 +3028,86 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             return clientNodes.get(node.id());
         }
     }
+
+    /**
+     * Fills affinity node caches.
+     *
+     * @param allNodes All nodes.
+     * @param allCacheNodes All cache nodes.
+     * @param cacheGrpAffNodes Cache group aff nodes.
+     * @param rmtNodesWithCaches Rmt nodes with caches.
+     */
+    private void fillAffinityNodeCaches(List<ClusterNode> allNodes, Map<Integer, List<ClusterNode>> allCacheNodes,
+        Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Set<ClusterNode> rmtNodesWithCaches) {
+        for (ClusterNode node : allNodes) {
+            assert node.order() != 0 : "Invalid node order [locNode=" + localNode() + ", node=" + node + ']';
+            assert !node.isDaemon();
+
+            for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) {
+                CacheGroupAffinity grpAff = e.getValue();
+                Integer grpId = e.getKey();
+
+                if (CU.affinityNode(node, grpAff.cacheFilter)) {
+                    List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);
+
+                    if (nodes == null)
+                        cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>());
+
+                    nodes.add(node);
+                }
+            }
+
+            for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+                String cacheName = entry.getKey();
+                CachePredicate filter = entry.getValue();
+
+                if (filter.cacheNode(node)) {
+                    if (!node.isLocal())
+                        rmtNodesWithCaches.add(node);
+
+                    addToMap(allCacheNodes, cacheName, node);
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache apply(DynamicCacheChangeBatch msg, AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        List<ClusterNode> allNodes = discoCache.allNodes();
+        Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+        Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
+        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+        fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches);
+
+        return new DiscoCache(
+            topVer,
+            discoCache.state(),
+            discoCache.localNode(),
+            discoCache.remoteNodes(),
+            allNodes,
+            discoCache.serverNodes(),
+            discoCache.daemonNodes(),
+            U.sealList(rmtNodesWithCaches),
+            allCacheNodes,
+            cacheGrpAffNodes,
+            discoCache.nodeMap,
+            discoCache.alives,
+            discoCache.minimumNodeVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache apply(CacheAffinityChangeMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return discoCache.copy(topVer, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache apply(SnapshotDiscoveryMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return discoCache.copy(topVer, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache apply(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return discoCache.copy(topVer, ctx.state().clusterState());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ReuseDiscoCacheStrategy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ReuseDiscoCacheStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ReuseDiscoCacheStrategy.java
new file mode 100644
index 0000000..7c5652b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ReuseDiscoCacheStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+
+/**
+ * Defines messages which are willing to reuse discovery cache for efficiency reasons.
+ */
+public interface ReuseDiscoCacheStrategy {
+    /**
+     * Create discovery cache for {@link DynamicCacheChangeBatch} message.
+     *
+     * @param msg Message.
+     * @param topVer Topology version.
+     * @param discoCache Disco cache.
+     */
+    public DiscoCache apply(DynamicCacheChangeBatch msg, AffinityTopologyVersion topVer, DiscoCache discoCache);
+
+    /**
+     * Create discovery cache for {@link CacheAffinityChangeMessage} message.
+     *
+     * @param msg Message.
+     * @param topVer Topology version.
+     * @param discoCache Disco cache.
+     */
+    public DiscoCache apply(CacheAffinityChangeMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache);
+
+    /**
+     * Create discovery cache for {@link SnapshotDiscoveryMessage} message.
+     *
+     * @param msg Message.
+     * @param topVer Topology version.
+     * @param discoCache Disco cache.
+     */
+    public DiscoCache apply(SnapshotDiscoveryMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache);
+
+    /**
+     * Create discovery cache for {@link ChangeGlobalStateMessage} message.
+     *
+     * @param msg Message.
+     * @param topVer Topology version.
+     * @param discoCache Disco cache.
+     */
+    public DiscoCache apply(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer, DiscoCache discoCache);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 8cff65e..8be4a5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -153,6 +156,19 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
         return false;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return stgy.apply(this, topVer, discoCache);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheAffinityChangeMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index 3d120f7..1be8fcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -22,7 +22,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -169,6 +173,19 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
         return false;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ClientCacheChangeDiscoveryMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 44f6002..a9180c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -20,7 +20,11 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -101,6 +105,19 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this,

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index d5c820f..f01857e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.Set;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -74,6 +78,19 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return stgy.apply(this, topVer, discoCache);
+    }
+
+    /**
      * @return Collection of change requests.
      */
     public Collection<DynamicCacheChangeRequest> requests() {
@@ -137,4 +154,4 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     @Override public String toString() {
         return S.toString(DynamicCacheChangeBatch.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
index ef5370e..c195731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -16,7 +16,10 @@
  */
 package org.apache.ignite.internal.processors.cache.binary;
 
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -67,6 +70,19 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
         return true;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** */
     int acceptedVersion() {
         return acceptedVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index 715e668..160faf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -20,7 +20,10 @@ import java.util.UUID;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryMetadataHandler;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -131,6 +134,19 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa
     }
 
     /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
+    /**
      * @param err Error caused this update to be rejected.
      */
     void markRejected(BinaryObjectException err) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
index 0771198..79342cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -18,7 +18,11 @@
 package org.apache.ignite.internal.processors.cluster;
 
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -79,6 +83,19 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage {
         return false;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ChangeGlobalStateFinishMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 6579399..eed04fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cluster;
 
 import java.util.List;
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -112,7 +116,20 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
         return false;
     }
 
-   /**
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return stgy.apply(this, topVer, discoCache);
+    }
+
+    /**
     * @return Node initiated state change.
     */
     public UUID initiatorNodeId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index 01a95df..8279b07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -18,8 +18,13 @@
 package org.apache.ignite.internal.processors.continuous;
 
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -57,4 +62,17 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
     @Override public boolean isMutable() {
         return false;
     }
+
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 1765f2c..4063e05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -92,4 +92,4 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     @Override public String toString() {
         return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 320226b..82996d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -132,4 +132,4 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     @Override public String toString() {
         return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index e6305c7..79d8b29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -45,4 +45,4 @@ public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     @Override public String toString() {
         return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
index 30d12d1..f6b18fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -45,4 +45,4 @@ public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage {
     @Override public String toString() {
         return S.toString(StopRoutineDiscoveryMessage.class, this, "routineId", routineId());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
index 23c2858..52eac25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.marshaller;
 
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -59,6 +62,19 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage {
         return false;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** */
     MarshallerMappingItem getMappingItem() {
         return item;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
index 33a2168..8c2abaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -18,7 +18,11 @@
 package org.apache.ignite.internal.processors.marshaller;
 
 import java.util.UUID;
+
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -94,6 +98,19 @@ public class MappingProposedMessage implements DiscoveryCustomMessage {
         return true;
     }
 
+    /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
     /** */
     MarshallerMappingItem mappingItem() {
         return mappingItem;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index 9fdc6c3..e055606 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.schema.message;
 
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.ReuseDiscoCacheStrategy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract discovery message for schema operations.
@@ -52,6 +56,19 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM
     }
 
     /**
+     * {@inheritDoc}
+     * @param stgy Reuse strategy.
+     * @param topVer New topology version.
+     * @param discoCache Discovery cache
+     *
+     * @return Reused discovery cache if possible.
+     */
+    @Nullable @Override public DiscoCache reuseDiscoCache(ReuseDiscoCacheStrategy stgy,
+        AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        return null;
+    }
+
+    /**
      * @return Operation.
      */
     public SchemaAbstractOperation operation() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c911b590/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 5fd2606..0e1270b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -99,9 +99,8 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag
      * @param err Error.
      */
     public void onError(SchemaOperationException err) {
-        if (!hasError()) {
+        if (!hasError())
             this.err = err;
-        }
     }
 
     /**