You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 14:38:20 UTC

[6/9] ignite git commit: IGNITE-4302 - Use custom messages to exchange binary metadata - Fixes #1655.

IGNITE-4302 - Use custom messages to exchange binary metadata - Fixes #1655.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44cf1d21
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44cf1d21
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44cf1d21

Branch: refs/heads/ignite-4003
Commit: 44cf1d21ad4363041043ad88161a954c738f20eb
Parents: f056f2e
Author: Sergey Chugunov <se...@gmail.com>
Authored: Thu Mar 30 11:40:35 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Mar 30 11:40:35 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |   5 +-
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../binary/BinaryCachingMetadataHandler.java    |   6 +
 .../ignite/internal/binary/BinaryContext.java   |  10 +
 .../ignite/internal/binary/BinaryMetadata.java  |  27 +-
 .../internal/binary/BinaryMetadataHandler.java  |  18 +-
 .../binary/BinaryNoopMetadataHandler.java       |   5 +
 .../internal/binary/BinaryReaderExImpl.java     |   2 +-
 .../ignite/internal/binary/BinaryUtils.java     |   2 +-
 .../communication/GridIoMessageFactory.java     |  12 +
 .../discovery/DiscoveryCustomMessage.java       |  48 ++
 .../eventstorage/GridEventStorageManager.java   |   6 +-
 .../cache/binary/BinaryMetadataHolder.java      |  73 +++
 .../cache/binary/BinaryMetadataTransport.java   | 641 +++++++++++++++++++
 .../binary/BinaryMetadataUpdatedListener.java   |  29 +
 .../binary/CacheObjectBinaryProcessor.java      |   9 +
 .../binary/CacheObjectBinaryProcessorImpl.java  | 474 ++++----------
 .../binary/ClientMetadataRequestFuture.java     | 161 +++++
 .../cache/binary/MetadataRequestMessage.java    | 122 ++++
 .../cache/binary/MetadataResponseMessage.java   | 195 ++++++
 .../binary/MetadataUpdateAcceptedMessage.java   |  96 +++
 .../binary/MetadataUpdateProposedMessage.java   | 224 +++++++
 .../cache/binary/MetadataUpdateResult.java      |  96 +++
 .../continuous/CacheContinuousQueryHandler.java |   4 +-
 .../GridMarshallerMappingProcessor.java         |   7 +-
 .../service/GridServiceProcessor.java           |   5 +
 .../internal/GridTaskExecutionSelfTest.java     |   2 +
 .../binary/TestCachingMetadataHandler.java      |   5 +
 .../binary/BinaryMetadataUpdatesFlowTest.java   | 592 +++++++++++++++++
 ...naryObjectMetadataExchangeMultinodeTest.java | 463 ++++++++++++++
 .../ignite/testframework/GridTestUtils.java     |  57 ++
 .../IgniteBinaryObjectsTestSuite.java           |   4 +
 32 files changed, 3056 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 560d7f6..dc14ae3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -49,7 +49,10 @@ public interface GridComponent {
         CLUSTER_PROC,
 
         /** */
-        MARSHALLER_PROC
+        MARSHALLER_PROC,
+
+        /** */
+        BINARY_PROC
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3ffc56e..6efc0d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -103,7 +103,10 @@ public enum GridTopic {
     TOPIC_MAPPING_MARSH,
 
     /** */
-    TOPIC_HADOOP_MSG;
+    TOPIC_HADOOP_MSG,
+
+    /** */
+    TOPIC_METADATA_REQ;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
index 39189f0..24a393d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
@@ -67,4 +67,10 @@ public class BinaryCachingMetadataHandler implements BinaryMetadataHandler {
     @Override public synchronized BinaryType metadata(int typeId) throws BinaryObjectException {
         return metas.get(typeId);
     }
+
+    /** {@inheritDoc} */
+    @Override public synchronized BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+        BinaryTypeImpl type = (BinaryTypeImpl) metas.get(typeId);
+        return type.metadata().hasSchema(schemaId) ? type : null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index e5b6bda..d918aa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -1224,6 +1224,16 @@ public class BinaryContext {
 
     /**
      * @param typeId Type ID.
+     * @param schemaId Schema ID.
+     * @return Meta data.
+     * @throws BinaryObjectException In case of error.
+     */
+    public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+        return metaHnd != null ? metaHnd.metadata(typeId, schemaId): null;
+    }
+
+    /**
+     * @param typeId Type ID.
      * @return Affinity key field name.
      */
     public String affinityKeyFieldName(int typeId) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
index d1c79f3..393d9ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
@@ -28,7 +28,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-
+import java.util.Set;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -60,6 +60,9 @@ public class BinaryMetadata implements Externalizable {
     /** Schemas associated with type. */
     private Collection<BinarySchema> schemas;
 
+    /** Schema IDs registered for this type */
+    private Set<Integer> schemaIds;
+
     /** Whether this is enum type. */
     private boolean isEnum;
 
@@ -89,6 +92,16 @@ public class BinaryMetadata implements Externalizable {
         this.fields = fields;
         this.affKeyFieldName = affKeyFieldName;
         this.schemas = schemas;
+
+        if (schemas != null) {
+            schemaIds = U.newHashSet(schemas.size());
+
+            for (BinarySchema schema : schemas)
+                schemaIds.add(schema.schemaId());
+        }
+        else
+            schemaIds = Collections.emptySet();
+
         this.isEnum = isEnum;
     }
 
@@ -145,6 +158,14 @@ public class BinaryMetadata implements Externalizable {
     }
 
     /**
+     * @param schemaId Schema ID.
+     * @return {@code true} if <b>BinaryMetadata</b> instance has schema with ID specified, {@code false} otherwise.
+     */
+    public boolean hasSchema(int schemaId) {
+        return schemaIds.contains(schemaId);
+    }
+
+    /**
      * @return {@code True} if this is enum type.
      */
     public boolean isEnum() {
@@ -249,12 +270,16 @@ public class BinaryMetadata implements Externalizable {
         else {
             schemas = new ArrayList<>();
 
+            schemaIds = U.newHashSet(schemasSize);
+
             for (int i = 0; i < schemasSize; i++) {
                 BinarySchema schema = new BinarySchema();
 
                 schema.readFrom(in);
 
                 schemas.add(schema);
+
+                schemaIds.add(schema.schemaId());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
index 29ff7b3..748a283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
@@ -28,8 +28,8 @@ public interface BinaryMetadataHandler {
      * Adds meta data.
      *
      * @param typeId Type ID.
-     * @param meta Meta data.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @param meta Metadata.
+     * @throws BinaryObjectException In case of error.
      */
     public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException;
 
@@ -37,8 +37,18 @@ public interface BinaryMetadataHandler {
      * Gets meta data for provided type ID.
      *
      * @param typeId Type ID.
-     * @return Meta data.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @return Metadata.
+     * @throws BinaryObjectException In case of error.
      */
     public BinaryType metadata(int typeId) throws BinaryObjectException;
+
+    /**
+     * Gets metadata for provided type ID and schema ID.
+     *
+     * @param typeId Type ID.
+     * @param schemaId Schema ID.
+     * @return Metadata.
+     * @throws BinaryObjectException In case of error.
+     */
+    public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
index 9c0c37d..770d9c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
@@ -50,4 +50,9 @@ public class BinaryNoopMetadataHandler implements BinaryMetadataHandler {
     @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index f851b68..d6fefe3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -1984,7 +1984,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
 
         if (schema == null) {
             if (fieldIdLen != BinaryUtils.FIELD_ID_LEN) {
-                BinaryTypeImpl type = (BinaryTypeImpl)ctx.metadata(typeId);
+                BinaryTypeImpl type = (BinaryTypeImpl) ctx.metadata(typeId, schemaId);
 
                 if (type == null || type.metadata() == null)
                     throw new BinaryObjectException("Cannot find metadata for object with compact footer: " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 41ec078..c59b8b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -1562,7 +1562,7 @@ public class BinaryUtils {
         Class cls;
 
         if (typeId != GridBinaryMarshaller.UNREGISTERED_TYPE_ID)
-            cls = ctx.descriptorForTypeId(true, typeId, ldr, false).describedClass();
+            cls = ctx.descriptorForTypeId(true, typeId, ldr, true).describedClass();
         else {
             String clsName = doReadClassName(in);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7bf3de2..737d047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEvictionRequest;
 import org.apache.ignite.internal.processors.cache.GridCacheEvictionResponse;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -651,6 +653,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 80:
+                msg = new MetadataRequestMessage();
+
+                break;
+
+            case 81:
+                msg = new MetadataResponseMessage();
+
+                break;
+
             case 82:
                 msg = new JobStealingRequest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index d85075e..f908b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -19,10 +19,58 @@ package org.apache.ignite.internal.managers.discovery;
 
 import java.io.Serializable;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
+ * <b>DiscoveryCustomMessage</b> messages are handled by discovery protocol which provides some guarantees around them.
  *
+ * When some node sends <b>DiscoveryCustomMessage</b> with {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}
+ * call, message firstly goes to current coordinator, is verified there and after that gets sent to the cluster.
+ * Only after verification it is delivered to listeners on all nodes starting from coordinator.
+ *
+ * To register a listener {@link GridDiscoveryManager#setCustomEventListener(Class, CustomEventListener)} method is used.
+ *
+ * Discovery protocol guarantees include:
+ * <ol>
+ *     <li>
+ *         All discovery messages are observed by all nodes in exactly the same order,
+ *         it is guaranteed by handling them in single-threaded mode.
+ *     </li>
+ *     <li>
+ *         New server node joining process in default implementation involves two passes of different messages across the cluster:
+ *         {@link TcpDiscoveryNodeAddedMessage} and
+ *         {@link TcpDiscoveryNodeAddFinishedMessage} messages.
+ *         It is guaranteed that all discovery messages observed by coordinator in between these two messages
+ *         are reordered and guaranteed to be delivered to newly joined node.
+ *     </li>
+ * </ol>
+ *
+ * Yet there are some features and limitations one should be aware of when using custom discovery messaging mechanism:
+ * <ol>
+ *     <li>
+ *         Guarantee #2 doesn't encompass <b>DiscoveryCustomMessage</b>s created automatically on
+ *         {@link DiscoveryCustomMessage#ackMessage()} method call.
+ *
+ *         If there were messages of this type in between <b>TcpDiscoveryNodeAddedMessage</b> and
+ *         <b>TcpDiscoveryNodeAddFinishedMessage</b> messages, they won't be delivered to new joiner node.
+ *     </li>
+ *     <li>
+ *         There is no guarantee for a given <b>DiscoveryCustomMessage</b> to be delivered only once.
+ *         It is possible that because of node failure antecedent node will resend messages
+ *         it thinks were not sent by failed node.
+ *         Duplicate messages are not filtered out on receiver side.
+ *     </li>
+ *     <li>
+ *         <b>DiscoveryCustomMessage</b>s are delivered to client nodes in asynchronous fashion
+ *         as clients don't participate in the cluster ring.
+ *     </li>
+ *     <li>
+ *         Any blocking operations like obtaining locks or doing I/O <b>must</b> be avoided in message handlers
+ *         as they may lead to deadlocks and cluster failures.
+ *     </li>
+ * </ol>
  */
 public interface DiscoveryCustomMessage extends Serializable {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 9194b10..77bd7d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -71,6 +70,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_EVENT;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
@@ -494,7 +494,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @return {@code true} if this is a system hidden event.
      */
     private boolean isHiddenEvent(int type) {
-        return type == EVT_NODE_METRICS_UPDATED;
+        return type == EVT_NODE_METRICS_UPDATED || type == EVT_DISCOVERY_CUSTOM_EVT;
     }
 
     /**
@@ -507,7 +507,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @return {@code true} if this is an internal event.
      */
     private boolean isInternalEvent(int type) {
-        return type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT || F.contains(EVTS_DISCOVERY_ALL, type);
+        return type == EVT_DISCOVERY_CUSTOM_EVT || F.contains(EVTS_DISCOVERY_ALL, type);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
new file mode 100644
index 0000000..bbc929e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ * Wrapper for {@link BinaryMetadata} which is stored in metadata local cache on each node.
+ * Used internally to track version counters (see javadoc for {@link MetadataUpdateProposedMessage} for more details).
+ */
+final class BinaryMetadataHolder implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final BinaryMetadata metadata;
+
+    /** */
+    private final int pendingVer;
+
+    /** */
+    private final int acceptedVer;
+
+
+    /**
+     * @param metadata Metadata.
+     * @param pendingVer Version of this metadata - how many updates were issued for this type.
+     * @param acceptedVer Pending updates count.
+     */
+    BinaryMetadataHolder(BinaryMetadata metadata, int pendingVer, int acceptedVer) {
+        assert metadata != null;
+
+        this.metadata = metadata;
+        this.pendingVer = pendingVer;
+        this.acceptedVer = acceptedVer;
+    }
+
+    /**
+     *
+     */
+    BinaryMetadata metadata() {
+        return metadata;
+    }
+
+    /**
+     *
+     */
+    int pendingVersion() {
+        return pendingVer;
+    }
+
+    /**
+     *
+     */
+    int acceptedVersion() {
+        return acceptedVer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
new file mode 100644
index 0000000..770fa32
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Provides API for discovery-based metadata exchange protocol and communication SPI-based metadata request protocol.
+ *
+ * It is responsible for sending update and metadata requests and manages message listeners for them.
+ *
+ * It also manages synchronization logic (blocking/unblocking threads requesting updates or up-to-date metadata etc)
+ * around protocols.
+ */
+final class BinaryMetadataTransport {
+    /** */
+    private final GridDiscoveryManager discoMgr;
+
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final UUID locNodeId;
+
+    /** */
+    private final boolean clientNode;
+
+    /** */
+    private final ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache;
+
+    /** */
+    private final Queue<MetadataUpdateResultFuture> unlabeledFutures = new ConcurrentLinkedQueue<>();
+
+    /** */
+    private final ConcurrentMap<SyncKey, MetadataUpdateResultFuture> syncMap = new ConcurrentHashMap8<>();
+
+    /** */
+    private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+    /** */
+    private volatile boolean stopping;
+
+    /** */
+    private final List<BinaryMetadataUpdatedListener> binaryUpdatedLsnrs = new CopyOnWriteArrayList<>();
+
+    /**
+     * @param metaLocCache Metadata locale cache.
+     * @param ctx Context.
+     * @param log Logger.
+     */
+    BinaryMetadataTransport(ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache, final GridKernalContext ctx, IgniteLogger log) {
+        this.metaLocCache = metaLocCache;
+
+        this.ctx = ctx;
+
+        this.log = log;
+
+        discoMgr = ctx.discovery();
+
+        locNodeId = ctx.localNodeId();
+
+        clientNode = ctx.clientNode();
+
+        discoMgr.setCustomEventListener(MetadataUpdateProposedMessage.class, new MetadataUpdateProposedListener());
+
+        discoMgr.setCustomEventListener(MetadataUpdateAcceptedMessage.class, new MetadataUpdateAcceptedListener());
+
+        GridIoManager ioMgr = ctx.io();
+
+        if (clientNode)
+            ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataResponseListener());
+        else
+            ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataRequestListener(ioMgr));
+
+        if (clientNode)
+            ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+                    if (!ctx.isStopping()) {
+                        for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
+                            fut.onNodeLeft(evt0.eventNode().id());
+                    }
+                }
+            }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
+
+    /**
+     * Adds BinaryMetadata updates {@link BinaryMetadataUpdatedListener listener} to transport.
+     *
+     * @param lsnr Listener.
+     */
+    void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
+        binaryUpdatedLsnrs.add(lsnr);
+    }
+
+    /**
+     * Sends request to cluster proposing update for given metadata.
+     *
+     * @param metadata Metadata proposed for update.
+     * @return Future to wait for update result on.
+     */
+    GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) throws IgniteCheckedException {
+        MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture();
+
+        synchronized (this) {
+            unlabeledFutures.add(resFut);
+
+            if (!stopping)
+                discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, locNodeId));
+            else
+                resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+        }
+
+        return resFut;
+    }
+
+    /**
+     * Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster.
+     *
+     * @param typeId ID of binary type.
+     * @param ver version of given binary type (see {@link MetadataUpdateProposedMessage} javadoc for more details).
+     * @return future to wait for update result on.
+     */
+    GridFutureAdapter<MetadataUpdateResult> awaitMetadataUpdate(int typeId, int ver) {
+        SyncKey key = new SyncKey(typeId, ver);
+        MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(key);
+
+        MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, resFut);
+
+        if (oldFut != null)
+            resFut = oldFut;
+
+        BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+        if (holder.acceptedVersion() >= ver)
+            resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+        return resFut;
+    }
+
+    /**
+     * Allows client node to request latest version of binary metadata for a given typeId from the cluster
+     * in case client is able to detect that it has obsolete metadata in its local cache.
+     *
+     * @param typeId ID of binary type.
+     * @return future to wait for request arrival on.
+     */
+    GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) {
+        ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap);
+
+        ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut);
+
+        if (oldFut != null)
+            return oldFut;
+
+        newFut.requestMetadata();
+
+        return newFut;
+    }
+
+    /** */
+    void stop() {
+        stopping = true;
+
+        cancelFutures(MetadataUpdateResult.createUpdateDisabledResult());
+    }
+
+    /** */
+    void onDisconnected() {
+        cancelFutures(MetadataUpdateResult.createFailureResult(new BinaryObjectException("Failed to update or wait for metadata, client node disconnected")));
+    }
+
+    /**
+     * @param res result to cancel futures with.
+     */
+    private void cancelFutures(MetadataUpdateResult res) {
+        for (MetadataUpdateResultFuture fut : unlabeledFutures)
+            fut.onDone(res);
+
+        for (MetadataUpdateResultFuture fut : syncMap.values())
+            fut.onDone(res);
+
+        for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
+            fut.onDone(res);
+    }
+
+    /** */
+    private final class MetadataUpdateProposedListener implements CustomEventListener<MetadataUpdateProposedMessage> {
+
+        /** {@inheritDoc} */
+        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+            int typeId = msg.typeId();
+
+            BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+            int pendingVer;
+            int acceptedVer;
+
+            if (msg.pendingVersion() == 0) {
+                if (holder != null) {
+                    pendingVer = holder.pendingVersion() + 1;
+                    acceptedVer = holder.acceptedVersion();
+                }
+                else {
+                    pendingVer = 1;
+                    acceptedVer = 0;
+                }
+
+                msg.pendingVersion(pendingVer);
+                msg.acceptedVersion(acceptedVer);
+
+                BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
+
+                try {
+                    BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+
+                    msg.metadata(mergedMeta);
+                }
+                catch (BinaryObjectException err) {
+                    log.warning("Exception with merging metadata for typeId: " + typeId, err);
+
+                    msg.markRejected(err);
+                }
+            }
+            else {
+                pendingVer = msg.pendingVersion();
+                acceptedVer = msg.acceptedVersion();
+            }
+
+            if (locNodeId.equals(msg.origNodeId())) {
+                MetadataUpdateResultFuture fut = unlabeledFutures.poll();
+
+                if (msg.rejected())
+                    fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
+                else {
+                    if (clientNode) {
+                        BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer);
+
+                        holder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+                        if (holder != null) {
+                            boolean obsoleteUpd = false;
+
+                            do {
+                                holder = metaLocCache.get(typeId);
+
+                                if (obsoleteUpdate(
+                                        holder.pendingVersion(),
+                                        holder.acceptedVersion(),
+                                        pendingVer,
+                                        acceptedVer)) {
+                                    obsoleteUpd = true;
+
+                                    fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+                                    break;
+                                }
+                            }
+                            while (!metaLocCache.replace(typeId, holder, newHolder));
+
+                            if (!obsoleteUpd)
+                                initSyncFor(typeId, pendingVer, fut);
+                        }
+                        else
+                            initSyncFor(typeId, pendingVer, fut);
+                    }
+                    else {
+                        initSyncFor(typeId, pendingVer, fut);
+
+                        metaLocCache.put(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer));
+                    }
+                }
+            }
+            else {
+                if (!msg.rejected()) {
+                    BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
+
+                    try {
+                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+
+                        BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
+
+                        if (clientNode) {
+                            holder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+                            if (holder != null) {
+                                do {
+                                    holder = metaLocCache.get(typeId);
+
+                                    if (obsoleteUpdate(
+                                            holder.pendingVersion(),
+                                            holder.acceptedVersion(),
+                                            pendingVer,
+                                            acceptedVer))
+                                        break;
+
+                                } while (!metaLocCache.replace(typeId, holder, newHolder));
+                            }
+                        }
+                        else
+                            metaLocCache.put(typeId, newHolder);
+                    }
+                    catch (BinaryObjectException ignored) {
+                        assert false : msg;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param typeId Type ID.
+     * @param pendingVer Pending version.
+     * @param fut Future.
+     */
+    private void initSyncFor(int typeId, int pendingVer, MetadataUpdateResultFuture fut) {
+        if (stopping) {
+            fut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+
+            return;
+        }
+
+        SyncKey key = new SyncKey(typeId, pendingVer);
+
+        syncMap.put(key, fut);
+
+        fut.key(key);
+    }
+
+    /**
+     *
+     */
+    private final class MetadataUpdateAcceptedListener implements CustomEventListener<MetadataUpdateAcceptedMessage> {
+
+        /** {@inheritDoc} */
+        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateAcceptedMessage msg) {
+            if (msg.duplicated())
+                return;
+
+            int typeId = msg.typeId();
+
+            BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+            assert holder != null : "No metadata found for typeId " + typeId;
+
+            int newAcceptedVer = msg.acceptedVersion();
+
+            if (clientNode) {
+                BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
+                        holder.pendingVersion(), newAcceptedVer);
+
+                do {
+                    holder = metaLocCache.get(typeId);
+
+                    int oldAcceptedVer = holder.acceptedVersion();
+
+                    if (oldAcceptedVer > newAcceptedVer)
+                        break;
+                }
+                while (!metaLocCache.replace(typeId, holder, newHolder));
+            }
+            else {
+                int oldAcceptedVer = holder.acceptedVersion();
+
+                if (oldAcceptedVer >= newAcceptedVer) {
+                    //this is duplicate ack
+                    msg.duplicated(true);
+
+                    return;
+                }
+
+                metaLocCache.put(typeId, new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
+            }
+
+            for (BinaryMetadataUpdatedListener lsnr : binaryUpdatedLsnrs)
+                lsnr.binaryMetadataUpdated(holder.metadata());
+
+            GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer));
+
+            if (fut != null)
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+        }
+    }
+
+    /**
+     * Future class responsible for blocking threads until particular events with metadata updates happen,
+     * e.g. arriving {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
+     */
+    private final class MetadataUpdateResultFuture extends GridFutureAdapter<MetadataUpdateResult> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        MetadataUpdateResultFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param key key in syncMap this future was added under.
+         */
+        MetadataUpdateResultFuture(SyncKey key) {
+            this.key = key;
+        }
+
+        /** */
+        private SyncKey key;
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable MetadataUpdateResult res, @Nullable Throwable err) {
+            assert res != null;
+
+            boolean done = super.onDone(res, err);
+
+            if (done && key != null)
+                syncMap.remove(key, this);
+
+            return done;
+        }
+
+        /**
+         * @param key Key.
+         */
+        void key(SyncKey key) {
+            this.key = key;
+        }
+    }
+
+    /**
+     * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages
+     * to {@link MetadataUpdateResultFuture}s other threads may be waiting on.
+     */
+    private static final class SyncKey {
+        /** */
+        private final int typeId;
+
+        /** */
+        private final int ver;
+
+        /**
+         * @param typeId Type id.
+         * @param ver Version.
+         */
+        private SyncKey(int typeId, int ver) {
+            this.typeId = typeId;
+            this.ver = ver;
+        }
+
+        /** */
+        int typeId() {
+            return typeId;
+        }
+
+        /** */
+        int version() {
+            return ver;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return typeId + ver;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (o == this)
+                return true;
+
+            if (!(o instanceof SyncKey))
+                return false;
+
+            SyncKey that = (SyncKey) o;
+
+            return (typeId == that.typeId) && (ver == that.ver);
+        }
+    }
+
+    /**
+     * Listener is registered on each server node in cluster waiting for metadata requests from clients.
+     */
+    private final class MetadataRequestListener implements GridMessageListener {
+        /** */
+        private final GridIoManager ioMgr;
+
+        /**
+         * @param ioMgr IO manager.
+         */
+        MetadataRequestListener(GridIoManager ioMgr) {
+            this.ioMgr = ioMgr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            assert msg instanceof MetadataRequestMessage : msg;
+
+            MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
+
+            int typeId = msg0.typeId();
+
+            BinaryMetadataHolder metaHolder = metaLocCache.get(typeId);
+
+            MetadataResponseMessage resp = new MetadataResponseMessage(typeId);
+
+            byte[] binMetaBytes = null;
+
+            if (metaHolder != null) {
+                try {
+                    binMetaBytes = U.marshal(ctx, metaHolder);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to marshal binary metadata for [typeId: " + typeId + "]", e);
+
+                    resp.markErrorOnRequest();
+                }
+            }
+
+            resp.binaryMetadataBytes(binMetaBytes);
+
+            try {
+                ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ, resp, SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send up-to-date metadata response.", e);
+            }
+        }
+    }
+
+    /**
+     * Listener is registered on each client node and listens for metadata responses from cluster.
+     */
+    private final class MetadataResponseListener implements GridMessageListener {
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            assert msg instanceof MetadataResponseMessage : msg;
+
+            MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
+
+            int typeId = msg0.typeId();
+
+            byte[] binMetaBytes = msg0.binaryMetadataBytes();
+
+            ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
+
+            if (fut == null)
+                return;
+
+            if (msg0.metadataNotFound()) {
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+                return;
+            }
+
+            try {
+                BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()));
+
+                BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+                if (oldHolder != null) {
+                    do {
+                        oldHolder = metaLocCache.get(typeId);
+
+                        if (oldHolder != null && obsoleteUpdate(
+                                oldHolder.pendingVersion(),
+                                oldHolder.acceptedVersion(),
+                                newHolder.pendingVersion(),
+                                newHolder.acceptedVersion()))
+                            break;
+                    }
+                    while (!metaLocCache.replace(typeId, oldHolder, newHolder));
+                }
+
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+            }
+            catch (IgniteCheckedException e) {
+                fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e)));
+            }
+        }
+
+
+    }
+
+    /**
+     * Method checks if arrived metadata is obsolete comparing to the one from local cache.
+     *
+     * @param locP pendingVersion of metadata from local cache.
+     * @param locA acceptedVersion of metadata from local cache.
+     * @param remP pendingVersion of metadata from arrived message (client response/proposed/accepted).
+     * @param remA acceptedVersion of metadata from arrived message (client response/proposed/accepted).
+     * @return {@code true} is
+     */
+    private static boolean obsoleteUpdate(int locP, int locA, int remP, int remA) {
+        return (remP < locP) || (remP == locP && remA < locA);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
new file mode 100644
index 0000000..08e879a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ *  Interface allows any component to register for events of binary metadata updates.
+ */
+public interface BinaryMetadataUpdatedListener {
+    /**
+     * @param metadata Updated metadata.
+     */
+    public void binaryMetadataUpdated(BinaryMetadata metadata);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
index 7cc7a29..1564ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
@@ -71,6 +71,15 @@ public interface CacheObjectBinaryProcessor extends IgniteCacheObjectProcessor {
      */
     @Nullable public BinaryType metadata(int typeId) throws IgniteException;
 
+
+    /**
+     * @param typeId Type ID.
+     * @param schemaId Schema ID.
+     * @return Meta data.
+     * @throws IgniteException In case of error.
+     */
+    @Nullable public BinaryType metadata(int typeId, int schemaId) throws IgniteException;
+
     /**
      * @param typeIds Type ID.
      * @return Meta data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 6e5940f..50a9f44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -17,25 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.binary;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import javax.cache.Cache;
+import java.util.concurrent.ConcurrentMap;
 import javax.cache.CacheException;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -45,9 +35,7 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.binary.BinaryTypeConfiguration;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.Event;
@@ -68,45 +56,37 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridMapEntry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T1;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
 
 /**
  * Binary processor implementation.
@@ -114,25 +94,7 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
     CacheObjectBinaryProcessor {
     /** */
-    private final CountDownLatch startLatch = new CountDownLatch(1);
-
-    /** */
-    private final boolean clientNode;
-
-    /** */
-    private volatile IgniteCacheProxy<BinaryMetadataKey, BinaryMetadata> metaDataCache;
-
-    /** */
-    private final ConcurrentHashMap8<Integer, BinaryTypeImpl> clientMetaDataCache;
-
-    /** Predicate to filter binary meta data in utility cache. */
-    private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() {
-        private static final long serialVersionUID = 0L;
-
-        @Override public boolean apply(GridCacheEntryEx e) {
-            return e.key().value(e.context().cacheObjectContext(), false) instanceof BinaryMetadataKey;
-        }
-    };
+    private volatile boolean discoveryStarted;
 
     /** */
     private BinaryContext binaryCtx;
@@ -151,11 +113,16 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
             binaryContext().unregisterBinarySchemas();
+
+            metadataLocCache.clear();
         }
     };
 
-    /** Metadata updates collected before metadata cache is initialized. */
-    private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
+    /** Locally cached metadata. This local cache is managed by exchanging discovery custom events. */
+    private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache = new ConcurrentHashMap8<>();
+
+    /** */
+    private BinaryMetadataTransport transport;
 
     /** Cached affinity key field names. */
     private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>();
@@ -167,10 +134,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         super(ctx);
 
         marsh = ctx.grid().configuration().getMarshaller();
-
-        clientNode = this.ctx.clientNode();
-
-        clientMetaDataCache = clientNode ? new ConcurrentHashMap8<Integer, BinaryTypeImpl>() : null;
     }
 
     /** {@inheritDoc} */
@@ -179,47 +142,38 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
             if (ctx.clientNode())
                 ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED);
 
+            transport = new BinaryMetadataTransport(metadataLocCache, ctx, log);
+
             BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
                 @Override public void addMeta(int typeId, BinaryType newMeta) throws BinaryObjectException {
                     assert newMeta != null;
                     assert newMeta instanceof BinaryTypeImpl;
 
-                    BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
+                    if (!discoveryStarted) {
+                        BinaryMetadataHolder holder = metadataLocCache.get(typeId);
 
-                    if (metaDataCache == null) {
-                        BinaryMetadata oldMeta = metaBuf.get(typeId);
-                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+                        BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;
 
-                        if (oldMeta != mergedMeta) {
-                            synchronized (this) {
-                                mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());
 
-                                if (oldMeta != mergedMeta)
-                                    metaBuf.put(typeId, mergedMeta);
-                                else
-                                    return;
-                            }
+                        if (oldMeta != mergedMeta)
+                            metadataLocCache.putIfAbsent(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
 
-                            if (metaDataCache == null)
-                                return;
-                            else
-                                metaBuf.remove(typeId);
-                        }
-                        else
-                            return;
+                        return;
                     }
 
-                    assert metaDataCache != null;
+                    BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
 
                     CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx));
                 }
 
                 @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
-                    if (metaDataCache == null)
-                        U.awaitQuiet(startLatch);
-
                     return CacheObjectBinaryProcessorImpl.this.metadata(typeId);
                 }
+
+                @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+                    return CacheObjectBinaryProcessorImpl.this.metadata(typeId, schemaId);
+                }
             };
 
             BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;
@@ -265,111 +219,34 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         }
     }
 
+    /**
+     * @param lsnr Listener.
+     */
+    public void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
+        if (transport != null)
+            transport.addBinaryMetadataUpdateListener(lsnr);
+    }
+
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) {
         if (ctx.clientNode())
             ctx.event().removeLocalEventListener(clientDisconLsnr);
-    }
 
-    /** {@inheritDoc} */
-    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
-        if (clientNode && !ctx.isDaemon()) {
-            ctx.continuous().registerStaticRoutine(
-                CU.UTILITY_CACHE_NAME,
-                new MetaDataEntryListener(),
-                new MetaDataEntryFilter(),
-                null);
-        }
+        if (transport != null)
+            transport.stop();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
-        IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
-
-        boolean old = proxy.context().deploy().ignoreOwnership(true);
-
-        try {
-            metaDataCache = (IgniteCacheProxy)proxy.withNoRetries();
-        }
-        finally {
-            proxy.context().deploy().ignoreOwnership(old);
-        }
-
-        if (clientNode) {
-            assert !metaDataCache.context().affinityNode();
-
-            while (true) {
-                ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
-
-                if (oldestSrvNode == null)
-                    break;
-
-                GridCacheQueryManager qryMgr = metaDataCache.context().queries();
-
-                CacheQuery<Map.Entry<BinaryMetadataKey, BinaryMetadata>> qry =
-                    qryMgr.createScanQuery(new MetaDataPredicate(), null, false);
-
-                qry.keepAll(false);
-
-                qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
-
-                try (GridCloseableIterator<Map.Entry<BinaryMetadataKey, BinaryMetadata>> entries = qry.executeScanQuery()) {
-                    for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : entries) {
-                        assert e.getKey() != null : e;
-                        assert e.getValue() != null : e;
-
-                        addClientCacheMetaData(e.getKey(), e.getValue());
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id()))
-                        continue;
-                    else
-                        throw e;
-                }
-                catch (CacheException e) {
-                    if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
-                        continue;
-                    else
-                        throw e;
-                }
-
-                break;
-            }
-        }
-
-        for (Map.Entry<Integer, BinaryMetadata> e : metaBuf.entrySet())
-            addMeta(e.getKey(), e.getValue().wrap(binaryCtx));
-
-        metaBuf.clear();
-
-        startLatch.countDown();
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        if (transport != null)
+            transport.onDisconnected();
     }
 
-    /**
-     * @param key Metadata key.
-     * @param newMeta Metadata.
-     */
-    private void addClientCacheMetaData(BinaryMetadataKey key, final BinaryMetadata newMeta) {
-        int key0 = key.typeId();
-
-        clientMetaDataCache.compute(key0, new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
-            @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
-                BinaryMetadata res;
-
-                BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
-
-                try {
-                    res = BinaryUtils.mergeMetadata(oldMeta0, newMeta);
-                }
-                catch (BinaryObjectException ignored) {
-                    res = oldMeta0;
-                }
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
 
-                return res != null ? res.wrap(binaryCtx) : null;
-            }
-        });
+        discoveryStarted = true;
     }
 
     /** {@inheritDoc} */
@@ -383,7 +260,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /**
      * @param obj Object.
      * @return Bytes.
-     * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+     * @throws BinaryObjectException If failed.
      */
     public byte[] marshal(@Nullable Object obj) throws BinaryObjectException {
         byte[] arr = binaryMarsh.marshal(obj);
@@ -397,7 +274,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
      * @param ptr Off-heap pointer.
      * @param forceHeap If {@code true} creates heap-based object.
      * @return Object.
-     * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+     * @throws BinaryObjectException If failed.
      */
     public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
         assert ptr > 0 : ptr;
@@ -536,54 +413,94 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
 
-        final BinaryMetadataKey key = new BinaryMetadataKey(typeId);
-
         try {
-            BinaryMetadata oldMeta = metaDataCache.localPeek(key);
-            BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+            BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
 
-            AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+            BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
 
-            if (topVer == null)
-                topVer = ctx.cache().context().exchange().readyAffinityVersion();
+            BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
 
-            BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));
+            MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
 
-            if (err != null)
-                throw err;
+            assert res != null;
+
+            if (res.rejected())
+                throw res.error();
         }
-        catch (CacheException e) {
+        catch (IgniteCheckedException e) {
             throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
         }
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public BinaryType metadata(final int typeId) throws BinaryObjectException {
-        try {
-            if (clientNode) {
-                BinaryType typeMeta = clientMetaDataCache.get(typeId);
+    @Nullable @Override public BinaryType metadata(final int typeId) {
+        BinaryMetadataHolder holder = metadataLocCache.get(typeId);
+
+        if (holder == null) {
+            if (ctx.clientNode()) {
+                try {
+                    transport.requestUpToDateMetadata(typeId).get();
 
-                if (typeMeta != null)
-                    return typeMeta;
+                    holder = metadataLocCache.get(typeId);
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op.
+                }
+            }
+        }
 
-                BinaryMetadata meta = metaDataCache.getTopologySafe(new BinaryMetadataKey(typeId));
+        if (holder != null) {
+            if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
+                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, holder.pendingVersion());
 
-                return meta != null ? meta.wrap(binaryCtx) : null;
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op.
+                }
             }
-            else {
-                BinaryMetadataKey key = new BinaryMetadataKey(typeId);
 
-                BinaryMetadata meta = metaDataCache.localPeek(key);
+            return holder.metadata().wrap(binaryCtx);
+        }
+        else
+            return null;
+    }
 
-                if (meta == null && !metaDataCache.context().preloader().syncFuture().isDone())
-                    meta = metaDataCache.getTopologySafe(key);
+    /** {@inheritDoc} */
+    @Nullable @Override public BinaryType metadata(final int typeId, final int schemaId) {
+        BinaryMetadataHolder holder = metadataLocCache.get(typeId);
+
+        if (ctx.clientNode()) {
+            if (holder == null || (holder != null && !holder.metadata().hasSchema(schemaId))) {
+                try {
+                    transport.requestUpToDateMetadata(typeId).get();
 
-                return meta != null ? meta.wrap(binaryCtx) : null;
+                    holder = metadataLocCache.get(typeId);
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op.
+                }
             }
         }
-        catch (CacheException e) {
-            throw new BinaryObjectException(e);
+        else {
+            if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
+                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
+                        typeId,
+                        holder.pendingVersion());
+
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op.
+                }
+
+                holder = metadataLocCache.get(typeId);
+            }
         }
+
+        return holder != null ? holder.metadata().wrap(binaryCtx) : null;
     }
 
     /** {@inheritDoc} */
@@ -595,12 +512,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
             for (Integer typeId : typeIds)
                 keys.add(new BinaryMetadataKey(typeId));
 
-            Map<BinaryMetadataKey, BinaryMetadata> meta = metaDataCache.getAll(keys);
-
-            Map<Integer, BinaryType> res = U.newHashMap(meta.size());
+            Map<Integer, BinaryType> res = U.newHashMap(metadataLocCache.size());
 
-            for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : meta.entrySet())
-                res.put(e.getKey().typeId(), e.getValue().wrap(binaryCtx));
+            for (Map.Entry<Integer, BinaryMetadataHolder> e : metadataLocCache.entrySet())
+                res.put(e.getKey(), e.getValue().metadata().wrap(binaryCtx));
 
             return res;
         }
@@ -612,22 +527,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public Collection<BinaryType> metadata() throws BinaryObjectException {
-        if (clientNode)
-            return F.viewReadOnly(clientMetaDataCache.values(), new IgniteClosure<BinaryTypeImpl, BinaryType>() {
-                @Override public BinaryType apply(BinaryTypeImpl meta) {
-                    return meta;
-                }
-            });
-        else {
-            return F.viewReadOnly(metaDataCache.entrySetx(metaPred),
-                new C1<Cache.Entry<BinaryMetadataKey, BinaryMetadata>, BinaryType>() {
-                    private static final long serialVersionUID = 0L;
-
-                    @Override public BinaryType apply(Cache.Entry<BinaryMetadataKey, BinaryMetadata> e) {
-                        return e.getValue().wrap(binaryCtx);
-                    }
-                });
-        }
+        return F.viewReadOnly(metadataLocCache.values(), new IgniteClosure<BinaryMetadataHolder, BinaryType>() {
+            @Override public BinaryType apply(BinaryMetadataHolder metaHolder) {
+                return metaHolder.metadata().wrap(binaryCtx);
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -907,127 +811,33 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         return null;
     }
 
-    /**
-     * Processor responsible for metadata update.
-     */
-    private static class MetadataProcessor
-        implements EntryProcessor<BinaryMetadataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private BinaryMetadata newMeta;
-
-        /**
-         * For {@link Externalizable}.
-         */
-        public MetadataProcessor() {
-            // No-op.
-        }
-
-        /**
-         * @param newMeta New metadata.
-         */
-        private MetadataProcessor(BinaryMetadata newMeta) {
-            assert newMeta != null;
-
-            this.newMeta = newMeta;
-        }
-
-        /** {@inheritDoc} */
-        @Override public BinaryObjectException process(MutableEntry<BinaryMetadataKey, BinaryMetadata> entry,
-            Object... args) {
-            try {
-                BinaryMetadata oldMeta = entry.getValue();
-
-                BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta);
-
-                if (mergedMeta != oldMeta)
-                    entry.setValue(mergedMeta);
-
-                return null;
-            }
-            catch (BinaryObjectException e) {
-                return e;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(newMeta);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            newMeta = (BinaryMetadata)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MetadataProcessor.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    class MetaDataEntryListener implements CacheEntryUpdatedListener<BinaryMetadataKey, BinaryMetadata> {
-        /** {@inheritDoc} */
-        @Override public void onUpdated(
-            Iterable<CacheEntryEvent<? extends BinaryMetadataKey, ? extends BinaryMetadata>> evts)
-            throws CacheEntryListenerException {
-            for (CacheEntryEvent<? extends BinaryMetadataKey, ? extends BinaryMetadata> evt : evts) {
-                assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt;
-
-                BinaryMetadataKey key = evt.getKey();
-
-                final BinaryMetadata newMeta = evt.getValue();
-
-                assert newMeta != null : evt;
-
-                addClientCacheMetaData(key, newMeta);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MetaDataEntryListener.class, this);
-        }
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return BINARY_PROC;
     }
 
-    /**
-     *
-     */
-    static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (!dataBag.commonDataCollectedFor(BINARY_PROC.ordinal())) {
+            Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());
 
-        /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
-            return evt.getKey() instanceof BinaryMetadataKey;
-        }
+            for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet())
+                res.put(e.getKey(), e.getValue());
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MetaDataEntryFilter.class, this);
+            dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable) res);
         }
     }
 
-    /**
-     *
-     */
-    static class MetaDataPredicate implements IgniteBiPredicate<Object, Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        Map<Integer, BinaryMetadataHolder> receivedData = (Map<Integer, BinaryMetadataHolder>) data.commonData();
 
-        /** {@inheritDoc} */
-        @Override public boolean apply(Object key, Object val) {
-            return key instanceof BinaryMetadataKey;
-        }
+        if (receivedData != null) {
+            for (Map.Entry<Integer, BinaryMetadataHolder> e : receivedData.entrySet()) {
+                BinaryMetadataHolder holder = e.getValue();
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MetaDataPredicate.class, this);
+                metadataLocCache.put(e.getKey(), new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), holder.pendingVersion()));
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
new file mode 100644
index 0000000..658e9da
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future is responsible for requesting up-to-date metadata from any server node in cluster
+ * and for blocking thread on client node until response arrives.
+ *
+ * It can cope with situation if node currently requested for the metadata leaves cluster;
+ * in that case future simply re-requests metadata from the next node available in topology.
+ */
+final class ClientMetadataRequestFuture extends GridFutureAdapter<MetadataUpdateResult> {
+    /** */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** */
+    private static IgniteLogger log;
+
+    /** */
+    private final GridIoManager ioMgr;
+
+    /** */
+    private final GridDiscoveryManager discoMgr;
+
+    /** */
+    private final int typeId;
+
+    /** */
+    private final Map<Integer, ClientMetadataRequestFuture> syncMap;
+
+    /** */
+    private final Queue<ClusterNode> aliveSrvNodes;
+
+    /** */
+    private ClusterNode pendingNode;
+
+    /**
+     * @param ctx Context.
+     * @param syncMap Map to store futures for ongoing requests.
+     */
+    ClientMetadataRequestFuture(
+            GridKernalContext ctx,
+            int typeId,
+            Map<Integer, ClientMetadataRequestFuture> syncMap
+    ) {
+        ioMgr = ctx.io();
+        discoMgr = ctx.discovery();
+        aliveSrvNodes = new LinkedList<>(discoMgr.aliveServerNodes());
+
+        this.typeId = typeId;
+        this.syncMap = syncMap;
+
+        if (log == null)
+            log = U.logger(ctx, logRef, ClientMetadataRequestFuture.class);
+    }
+
+    /** */
+    void requestMetadata() {
+        boolean noSrvsInCluster;
+
+        synchronized (this) {
+            while (!aliveSrvNodes.isEmpty()) {
+                ClusterNode srvNode = aliveSrvNodes.poll();
+
+                try {
+                    ioMgr.sendToGridTopic(srvNode,
+                            GridTopic.TOPIC_METADATA_REQ,
+                            new MetadataRequestMessage(typeId),
+                            GridIoPolicy.SYSTEM_POOL);
+
+                    if (discoMgr.node(srvNode.id()) == null)
+                        continue;
+
+                    pendingNode = srvNode;
+
+                    break;
+                }
+                catch (IgniteCheckedException ignored) {
+                    U.warn(log,
+                            "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+                                    + srvNode);
+                }
+            }
+
+            noSrvsInCluster = pendingNode == null;
+        }
+
+        if (noSrvsInCluster)
+            onDone(MetadataUpdateResult.createFailureResult(
+                    new BinaryObjectException(
+                            "All server nodes have left grid, cannot request metadata [typeId: "
+                                    + typeId + "]")));
+    }
+
+    /**
+     * If left node is the one latest metadata request was sent to,
+     * request is sent again to the next node in topology.
+     *
+     * @param leftNodeId ID of left node.
+     */
+    void onNodeLeft(UUID leftNodeId) {
+        boolean reqAgain = false;
+
+        synchronized (this) {
+            if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
+                aliveSrvNodes.remove(pendingNode);
+
+                pendingNode = null;
+
+                reqAgain = true;
+            }
+        }
+
+        if (reqAgain)
+            requestMetadata();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable MetadataUpdateResult res, @Nullable Throwable err) {
+        assert res != null;
+
+        boolean done = super.onDone(res, err);
+
+        if (done)
+            syncMap.remove(typeId);
+
+        return done;
+    }
+}