You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:33 UTC

[43/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to ignite-3477

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d746d54,546be37..00a4b2e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -507,10 -482,10 +490,10 @@@ public abstract class GridCacheQueryMan
       * @throws IgniteCheckedException Thrown in case of any errors.
       */
      @SuppressWarnings("SimplifiableIfStatement")
 -    public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException {
 +    public void remove(KeyCacheObject key, int partId, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException {
          assert key != null;
  
-         if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
+         if (!QueryUtils.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
              return; // No-op.
  
          if (!enterBusy())

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f334b84,dc4e52f..a59ff51
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -36,12 -30,8 +30,12 @@@ import javax.cache.expiry.Duration
  import javax.cache.expiry.ExpiryPolicy;
  import javax.cache.processor.EntryProcessor;
  import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
  import org.apache.ignite.internal.IgniteInternalFuture;
- import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 +import org.apache.ignite.internal.pagemem.wal.StorageException;
 +import org.apache.ignite.internal.pagemem.wal.WALPointer;
 +import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 +import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
  import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@@ -1142,304 -963,165 +995,163 @@@ public abstract class IgniteTxLocalAdap
  
      /**
       * @param cacheCtx Cache context.
-      * @param keys Key to enlist.
-      * @param expiryPlc Explicitly specified expiry policy for entry.
-      * @param map Return map.
-      * @param missed Map of missed keys.
-      * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
-      * @param deserializeBinary Deserialize binary flag.
-      * @param skipVals Skip values flag.
-      * @param keepCacheObjects Keep cache objects flag.
-      * @param skipStore Skip store flag.
-      * @throws IgniteCheckedException If failed.
-      * @return Enlisted keys.
+      * @param keys Keys.
+      * @return Expiry policy.
       */
-     @SuppressWarnings({"RedundantTypeArguments"})
-     private <K, V> Collection<KeyCacheObject> enlistRead(
-         final GridCacheContext cacheCtx,
-         @Nullable AffinityTopologyVersion entryTopVer,
-         Collection<KeyCacheObject> keys,
-         @Nullable ExpiryPolicy expiryPlc,
-         Map<K, V> map,
-         Map<KeyCacheObject, GridCacheVersion> missed,
-         int keysCnt,
-         boolean deserializeBinary,
-         boolean skipVals,
-         boolean keepCacheObjects,
-         boolean skipStore,
-         final boolean needVer,
-         final boolean recovery
-     ) throws IgniteCheckedException {
-         assert !F.isEmpty(keys);
-         assert keysCnt == keys.size();
- 
-         cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
- 
-         boolean single = keysCnt == 1;
- 
-         Collection<KeyCacheObject> lockKeys = null;
- 
-         AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
- 
-         boolean needReadVer = (serializable() && optimistic()) || needVer;
- 
-         // In this loop we cover only read-committed or optimistic transactions.
-         // Transactions that are pessimistic and not read-committed are covered
-         // outside of this loop.
-         for (KeyCacheObject key : keys) {
-             if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
-                 addActiveCache(cacheCtx, recovery);
- 
-             IgniteTxKey txKey = cacheCtx.txKey(key);
- 
-             // Check write map (always check writes first).
-             IgniteTxEntry txEntry = entry(txKey);
- 
-             // Either non-read-committed or there was a previous write.
-             if (txEntry != null) {
-                 CacheObject val = txEntry.value();
- 
-                 if (txEntry.hasValue()) {
-                     if (!F.isEmpty(txEntry.entryProcessors()))
-                         val = txEntry.applyEntryProcessors(val);
- 
-                     if (val != null) {
-                         GridCacheVersion ver = null;
+     protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+         return null;
+     }
  
-                         if (needVer) {
-                             if (txEntry.op() != READ)
-                                 ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
-                             else {
-                                 ver = txEntry.entryReadVersion();
+     /**
+      * Post lock processing for put or remove.
+      *
+      * @param cacheCtx Context.
+      * @param keys Keys.
+      * @param ret Return value.
+      * @param rmv {@code True} if remove.
+      * @param retval Flag to return value or not.
+      * @param read {@code True} if read.
+      * @param accessTtl TTL for read operation.
+      * @param filter Filter to check entries.
+      * @throws IgniteCheckedException If error.
+      * @param computeInvoke If {@code true} computes return value for invoke operation.
+      */
+     @SuppressWarnings("unchecked")
+     protected final void postLockWrite(
+         GridCacheContext cacheCtx,
+         Iterable<KeyCacheObject> keys,
+         GridCacheReturn ret,
+         boolean rmv,
+         boolean retval,
+         boolean read,
+         long accessTtl,
+         CacheEntryPredicate[] filter,
+         boolean computeInvoke
+     ) throws IgniteCheckedException {
+         for (KeyCacheObject k : keys) {
+             IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
  
-                                 if (ver == null && pessimistic()) {
-                                     while (true) {
-                                         try {
-                                             GridCacheEntryEx cached = txEntry.cached();
+             if (txEntry == null)
+                 throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+                     "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
  
-                                             ver = cached.isNear() ?
-                                                 ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+             while (true) {
+                 GridCacheEntryEx cached = txEntry.cached();
  
-                                             break;
-                                         }
-                                         catch (GridCacheEntryRemovedException ignored) {
-                                             txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
-                                         }
-                                     }
-                                 }
+                 try {
+                     assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+                         "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+                             ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
  
-                                 if (ver == null) {
-                                     assert optimistic() && repeatableRead() : this;
+                     if (log.isDebugEnabled())
+                         log.debug("Post lock write entry: " + cached);
  
-                                     ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
-                                 }
-                             }
+                     CacheObject v = txEntry.previousValue();
+                     boolean hasPrevVal = txEntry.hasPreviousValue();
  
-                             assert ver != null;
-                         }
+                     if (onePhaseCommit())
+                         filter = txEntry.filters();
  
-                         cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
-                             ver, 0, 0);
-                     }
-                 }
-                 else {
-                     assert txEntry.op() == TRANSFORM;
+                     // If we have user-passed filter, we must read value into entry for peek().
+                     if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+                         retval = true;
  
-                     while (true) {
-                         try {
-                             GridCacheVersion readVer = null;
-                             EntryGetResult getRes = null;
+                     boolean invoke = txEntry.op() == TRANSFORM;
  
-                             Object transformClo =
-                                 (txEntry.op() == TRANSFORM &&
-                                     cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                     F.first(txEntry.entryProcessors()) : null;
+                     if (retval || invoke) {
+                         if (!cacheCtx.isNear()) {
+                             if (!hasPrevVal) {
+                                 // For non-local cache should read from store after lock on primary.
+                                 boolean readThrough = cacheCtx.isLocal() &&
+                                     (invoke || cacheCtx.loadPreviousValue()) &&
+                                     !txEntry.skipStore();
  
-                             if (needVer) {
-                                 getRes = txEntry.cached().innerGetVersioned(
+                                 v = cached.innerGet(
                                      null,
                                      this,
-                                     /*update-metrics*/true,
-                                     /*event*/!skipVals,
 -                                    /*swap*/true,
+                                     readThrough,
+                                     /*metrics*/!invoke,
+                                     /*event*/!invoke && !dht(),
 -                                    /*temporary*/false,
                                      CU.subjectId(this, cctx),
-                                     transformClo,
-                                     resolveTaskName(),
-                                     null,
-                                     txEntry.keepBinary(),
-                                     null);
- 
-                                 if (getRes != null) {
-                                     val = getRes.value();
-                                     readVer = getRes.version();
-                                 }
-                             }
-                             else {
-                                 val = txEntry.cached().innerGet(
                                      null,
-                                     this,
-                                     /*read-through*/false,
-                                     /*metrics*/true,
-                                     /*event*/!skipVals,
-                                     CU.subjectId(this, cctx),
-                                     transformClo,
                                      resolveTaskName(),
                                      null,
                                      txEntry.keepBinary());
                              }
- 
-                             if (val != null) {
-                                 if (!readCommitted() && !skipVals)
-                                     txEntry.readValue(val);
- 
-                                 if (!F.isEmpty(txEntry.entryProcessors()))
-                                     val = txEntry.applyEntryProcessors(val);
- 
-                                 cacheCtx.addResult(map,
-                                     key,
-                                     val,
-                                     skipVals,
-                                     keepCacheObjects,
-                                     deserializeBinary,
-                                     false,
-                                     getRes,
-                                     readVer,
-                                     0,
-                                     0,
-                                     needVer);
-                             }
-                             else
-                                 missed.put(key, txEntry.cached().version());
- 
-                             break;
                          }
-                         catch (GridCacheEntryRemovedException ignored) {
-                             txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                         else {
+                             if (!hasPrevVal)
 -                                v = cached.rawGetOrUnmarshal(false);
++                                v = cached.rawGet();
                          }
-                     }
-                 }
-             }
-             // First time access within transaction.
-             else {
-                 if (lockKeys == null && !skipVals)
-                     lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
  
-                 if (!single && !skipVals)
-                     lockKeys.add(key);
+                         if (txEntry.op() == TRANSFORM) {
+                             if (computeInvoke) {
+                                 GridCacheVersion ver;
+ 
+                                 try {
+                                     ver = cached.version();
+                                 }
+                                 catch (GridCacheEntryRemovedException e) {
+                                     assert optimistic() : txEntry;
  
-                 while (true) {
-                     GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+                                     if (log.isDebugEnabled())
+                                         log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
  
-                     try {
-                         GridCacheVersion ver = entry.version();
- 
-                         CacheObject val = null;
-                         GridCacheVersion readVer = null;
-                         EntryGetResult getRes = null;
- 
-                         if (!pessimistic() || readCommitted() && !skipVals) {
-                             IgniteCacheExpiryPolicy accessPlc =
-                                 optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
- 
-                             if (needReadVer) {
-                                 getRes = primaryLocal(entry) ?
-                                     entry.innerGetVersioned(
-                                         null,
-                                         this,
-                                         /*metrics*/true,
-                                         /*event*/true,
-                                         CU.subjectId(this, cctx),
-                                         null,
-                                         resolveTaskName(),
-                                         accessPlc,
-                                         !deserializeBinary,
-                                         null) : null;
- 
-                                 if (getRes != null) {
-                                     val = getRes.value();
-                                     readVer = getRes.version();
+                                     ver = null;
                                  }
-                             }
-                             else {
-                                 val = entry.innerGet(
-                                     null,
-                                     this,
-                                     /*no read-through*/false,
-                                     /*metrics*/true,
-                                     /*event*/true,
-                                     CU.subjectId(this, cctx),
-                                     null,
-                                     resolveTaskName(),
-                                     accessPlc,
-                                     !deserializeBinary);
-                             }
  
-                             if (val != null) {
-                                 cacheCtx.addResult(map,
-                                     key,
-                                     val,
-                                     skipVals,
-                                     keepCacheObjects,
-                                     deserializeBinary,
-                                     false,
-                                     getRes,
-                                     readVer,
-                                     0,
-                                     0,
-                                     needVer);
+                                 addInvokeResult(txEntry, v, ret, ver);
                              }
-                             else
-                                 missed.put(key, ver);
                          }
                          else
-                             // We must wait for the lock in pessimistic mode.
-                             missed.put(key, ver);
- 
-                         if (!readCommitted() && !skipVals) {
-                             txEntry = addEntry(READ,
-                                 val,
-                                 null,
-                                 null,
-                                 entry,
-                                 expiryPlc,
-                                 null,
-                                 true,
-                                 -1L,
-                                 -1L,
-                                 null,
-                                 skipStore,
-                                 !deserializeBinary);
- 
-                             // As optimization, mark as checked immediately
-                             // for non-pessimistic if value is not null.
-                             if (val != null && !pessimistic()) {
-                                 txEntry.markValid();
- 
-                                 if (needReadVer) {
-                                     assert readVer != null;
- 
-                                     txEntry.entryReadVersion(readVer);
-                                 }
-                             }
-                         }
- 
-                         break; // While.
+                             ret.value(cacheCtx, v, txEntry.keepBinary());
                      }
-                     catch (GridCacheEntryRemovedException ignored) {
+ 
+                     boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
+ 
+                     // For remove operation we return true only if we are removing s/t,
+                     // i.e. cached value is not null.
+                     ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+ 
+                     if (onePhaseCommit())
+                         txEntry.filtersPassed(pass);
+ 
+                     boolean updateTtl = read;
+ 
+                     if (pass) {
+                         txEntry.markValid();
+ 
                          if (log.isDebugEnabled())
-                             log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                             log.debug("Filter passed in post lock for key: " + k);
                      }
-                     finally {
-                         if (entry != null && readCommitted()) {
-                             if (cacheCtx.isNear()) {
-                                 if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
-                                     if (entry.markObsolete(xidVer))
-                                         cacheCtx.cache().removeEntry(entry);
-                                 }
-                             }
-                             else
-                                 entry.context().evicts().touch(entry, topVer);
+                     else {
+                         // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+                         txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
+                         txEntry.filters(CU.empty0());
+                         txEntry.filtersSet(false);
+ 
+                         updateTtl = !cacheCtx.putIfAbsentFilter(filter);
+                     }
+ 
+                     if (updateTtl) {
+                         if (!read) {
+                             ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
+ 
+                             if (expiryPlc != null)
+                                 txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
                          }
+                         else
+                             txEntry.ttl(accessTtl);
                      }
+ 
+                     break; // While.
+                 }
+                 // If entry cached within transaction got removed before lock.
+                 catch (GridCacheEntryRemovedException ignore) {
+                     if (log.isDebugEnabled())
+                         log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
+ 
+                     txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index eb3524b,e0549fb..4726d86
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@@ -35,10 -35,10 +35,10 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 +import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
  import org.apache.ignite.internal.processors.cache.KeyCacheObject;
  import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
- import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+ import org.apache.ignite.internal.processors.query.QueryUtils;
 -import org.apache.ignite.internal.util.GridUnsafe;
  import org.apache.ignite.internal.util.IgniteUtils;
  import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.U;
@@@ -160,13 -174,13 +160,13 @@@ public class IgniteCacheObjectProcessor
      }
  
      /** {@inheritDoc} */
 -    @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
 +    @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException {
          switch (type) {
              case CacheObject.TYPE_BYTE_ARR:
 -                return new CacheObjectByteArrayImpl(bytes);
 +                throw new IllegalArgumentException("Byte arrays cannot be used as cache keys.");
  
              case CacheObject.TYPE_REGULAR:
-                 return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes);
 -                return new CacheObjectImpl(null, bytes);
++                return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes, -1);
          }
  
          throw new IllegalArgumentException("Invalid object type: " + type);
@@@ -299,8 -242,10 +299,8 @@@
      @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
          assert ccfg != null;
  
 -        CacheMemoryMode memMode = ccfg.getMemoryMode();
 -
          boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) &&
-             (GridQueryProcessor.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled()));
+             (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled()));
  
          CacheObjectContext res = new CacheObjectContext(ctx,
              ccfg.getName(),
@@@ -421,18 -362,10 +417,18 @@@
  
                      Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
  
-                     KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes);
 -                    return new KeyCacheObjectImpl(val, valBytes, partition());
++                    KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
 +
 +                    key.partition(partition());
 +
 +                    return key;
                  }
  
-                 KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes);
 -                return new KeyCacheObjectImpl(val, valBytes, partition());
++                KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
 +
 +                key.partition(partition());
 +
 +                return key;
              }
              catch (IgniteCheckedException e) {
                  throw new IgniteException("Failed to marshal object: " + val, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b,cfe94b2..075fb06
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@@ -121,11 -139,30 +139,30 @@@ public class ClusterProcessor extends G
          }
      }
  
+ 
+     /**
+      * @param vals collection to seek through.
+      */
+     private Boolean findLastFlag(Collection<Serializable> vals) {
+         Boolean flag = null;
+ 
+         for (Serializable ser : vals) {
+             if (ser != null) {
+                 Map<String, Object> map = (Map<String, Object>) ser;
+ 
+                 if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+                     flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+             }
+         }
+ 
+         return flag;
+     }
+ 
      /** {@inheritDoc} */
 -    @Override public void onKernalStart() throws IgniteCheckedException {
 +    @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
          if (notifyEnabled.get()) {
              try {
-                 verChecker = new GridUpdateNotifier(ctx.gridName(),
+                 verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(),
                      VER_STR,
                      ctx.gateway(),
                      ctx.plugins().allProviders(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 87d54a1,0000000..e5919e0
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@@ -1,947 -1,0 +1,944 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License. You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cluster;
 +
- import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicReference;
 +import org.apache.ignite.Ignite;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteCompute;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.configuration.CacheConfiguration;
 +import org.apache.ignite.events.DiscoveryEvent;
 +import org.apache.ignite.events.Event;
 +import org.apache.ignite.internal.GridKernalContext;
 +import org.apache.ignite.internal.IgniteInternalFuture;
 +import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 +import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 +import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 +import org.apache.ignite.internal.processors.GridProcessorAdapter;
 +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
- import org.apache.ignite.internal.processors.cache.ClusterState;
 +import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
++import org.apache.ignite.internal.processors.cache.ClusterState;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 +import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 +import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
 +import org.apache.ignite.internal.util.future.GridFinishedFuture;
 +import org.apache.ignite.internal.util.future.GridFutureAdapter;
 +import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 +import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 +import org.apache.ignite.internal.util.typedef.CI1;
 +import org.apache.ignite.internal.util.typedef.CI2;
 +import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.internal.util.typedef.internal.CU;
 +import org.apache.ignite.internal.util.typedef.internal.S;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteFuture;
 +import org.apache.ignite.lang.IgniteRunnable;
 +import org.apache.ignite.resources.IgniteInstanceResource;
++import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 +import org.jetbrains.annotations.Nullable;
 +
 +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
 +
 +/**
 + *
 + */
 +public class GridClusterStateProcessor extends GridProcessorAdapter {
 +    /** Global status. */
 +    private volatile ClusterState globalState;
 +
 +    /** Action context. */
 +    private volatile ChangeGlobalStateContext lastCgsCtx;
 +
 +    /** Local action future. */
 +    private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
 +
 +    /** Process. */
 +    @GridToStringExclude
 +    private GridCacheProcessor cacheProc;
 +
 +    /** Shared context. */
 +    @GridToStringExclude
 +    private GridCacheSharedContext<?, ?> sharedCtx;
 +
 +    //todo may be add init latch
 +
 +    /** Listener. */
 +    private final GridLocalEventListener lsr = new GridLocalEventListener() {
 +        @Override public void onEvent(Event evt) {
 +            assert evt != null;
 +
 +            final DiscoveryEvent e = (DiscoveryEvent)evt;
 +
 +            assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
 +
 +            final GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +            if (f != null)
 +                f.initFut.listen(new CI1<IgniteInternalFuture>() {
 +                    @Override public void apply(IgniteInternalFuture fut) {
 +                        f.onDiscoveryEvent(e);
 +                    }
 +                });
 +        }
 +    };
 +
 +    /**
 +     * @param ctx Kernal context.
 +     */
 +    public GridClusterStateProcessor(GridKernalContext ctx) {
 +        super(ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
 +        super.start(activeOnStart);
 +
 +        globalState = activeOnStart ? ACTIVE : INACTIVE;
 +        cacheProc = ctx.cache();
 +        sharedCtx = cacheProc.context();
 +
 +        sharedCtx.io().addHandler(0,
 +            GridChangeGlobalStateMessageResponse.class,
 +            new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
 +                @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
 +                    processChangeGlobalStateResponse(nodeId, msg);
 +                }
 +            });
 +
 +        ctx.discovery().setCustomEventListener(
 +            ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
 +                @Override public void onCustomEvent(
 +                    AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
 +                    assert topVer != null;
 +                    assert snd != null;
 +                    assert msg != null;
 +
 +                    boolean activate = msg.activate();
 +
 +                    ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +                    if (actx != null && globalState == TRANSITION) {
 +                        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Concurrent " + prettyStr(activate) + " [id=" +
 +                                ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
 +
 +                        if (f != null && f.requestId.equals(msg.requestId()))
 +                            f.onDone(new IgniteCheckedException(
 +                                "Concurrent change state, now in progress=" + (activate)
 +                                    + ", initiatingNodeId=" + actx.initiatingNodeId
 +                                    + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
 +                            ));
 +
 +                        msg.concurrentChangeState();
 +                    }
 +                    else {
 +                        if (log.isInfoEnabled())
 +                            log.info("Create " + prettyStr(activate) + " context [id=" +
 +                                ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
 +                                msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
 +
 +                        lastCgsCtx = new ChangeGlobalStateContext(
 +                            msg.requestId(),
 +                            msg.initiatorNodeId(),
 +                            msg.getDynamicCacheChangeBatch(),
 +                            msg.activate());
 +
 +                        globalState = TRANSITION;
 +                    }
 +                }
 +            });
 +
 +        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 +        super.stop(cancel);
 +
 +        sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class);
 +        ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 +
 +        IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
-             "Node is stopping: " + ctx.gridName());
++            "Node is stopping: " + ctx.igniteInstanceName());
 +
 +        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +        if (f != null)
 +            f.onDone(stopErr);
 +
 +        cgsLocFut.set(null);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
 +        return DiscoveryDataExchangeType.STATE_PROC;
 +    }
 +
 +    /** {@inheritDoc} */
-     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-         return globalState;
++    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
++        dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
 +    }
 +
 +    /** {@inheritDoc} */
-     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
-         if (ctx.localNodeId().equals(joiningNodeId))
-             globalState = (ClusterState)data;
++    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
++        globalState = (ClusterState)data.commonData();
 +    }
 +
 +    /**
 +     *
 +     */
 +    public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
 +        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
 +            throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction.");
 +
-         if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
++        if ((globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
 +            return new GridFinishedFuture<>();
 +
 +        final UUID requestId = UUID.randomUUID();
 +
 +        final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
 +
 +        if (!cgsLocFut.compareAndSet(null, cgsFut)) {
 +            GridChangeGlobalStateFuture locF = cgsLocFut.get();
 +
 +            if (locF.activate == activate)
 +                return locF;
 +            else
 +                return new GridFinishedFuture<>(new IgniteException(
 +                    "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate)));
 +        }
 +
 +        try {
 +            if (ctx.clientNode()) {
 +                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 +
 +                IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers())
 +                    .compute().withAsync();
 +
 +                if (log.isInfoEnabled())
 +                    log.info("Send " + prettyStr(activate) + " request from client node [id=" +
 +                        ctx.localNodeId() + " topVer=" + topVer + " ]");
 +
 +                comp.run(new ClientChangeGlobalStateComputeRequest(activate));
 +
 +                comp.future().listen(new CI1<IgniteFuture>() {
 +                    @Override public void apply(IgniteFuture fut) {
 +                        try {
 +                            fut.get();
 +
 +                            cgsFut.onDone();
 +                        }
 +                        catch (Exception e) {
 +                            cgsFut.onDone(e);
 +                        }
 +                    }
 +                });
 +            }
 +            else {
 +                List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 +
 +                DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
 +                    requestId, null, ctx.localNodeId());
 +
 +                changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
 +
 +                reqs.add(changeGlobalStateReq);
 +
 +                reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests());
 +
 +                ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
 +                    requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
 +
 +                try {
 +                    ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
 +
 +                    if (ctx.isStopping())
 +                        cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
 +                            "node is stopping."));
 +                }
 +                catch (IgniteCheckedException e) {
 +                    log.error("Fail create or send change global state request." + cgsFut, e);
 +
 +                    cgsFut.onDone(e);
 +                }
 +            }
 +        }
 +        catch (IgniteCheckedException e) {
 +            log.error("Fail create or send change global state request." + cgsFut, e);
 +
 +            cgsFut.onDone(e);
 +        }
 +
 +        return cgsFut;
 +    }
 +
 +    /**
 +     *
 +     */
 +    public boolean active() {
 +        ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +        if (actx != null && !actx.activate && globalState == TRANSITION)
 +            return true;
 +
 +        if (actx != null && actx.activate && globalState == TRANSITION)
 +            return false;
 +
 +        return globalState == ACTIVE;
 +    }
 +
 +    /**
 +     * @param reqs Requests.
 +     */
 +    public boolean changeGlobalState(
 +        Collection<DynamicCacheChangeRequest> reqs,
 +        AffinityTopologyVersion topVer
 +    ) {
 +        assert !F.isEmpty(reqs);
 +        assert topVer != null;
 +
 +        for (DynamicCacheChangeRequest req : reqs)
 +            if (req.globalStateChange()) {
 +                ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +                assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
 +
 +                cgsCtx.topologyVersion(topVer);
 +
 +                return true;
 +            }
 +
 +
 +        return false;
 +    }
 +
 +    /**
 +     * Invoke from exchange future.
 +     */
 +    public Exception onChangeGlobalState() {
 +        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +        assert cgsCtx != null;
 +
 +        if (f != null)
 +            f.setRemaining(cgsCtx.topVer);
 +
 +        return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
 +    }
 +
 +    /**
 +     * @param exs Exs.
 +     */
 +    public void onFullResponseMessage(Map<UUID, Exception> exs) {
 +        assert !F.isEmpty(exs);
 +
 +        ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +        actx.setFail();
 +
 +        // revert change if activation request fail
 +        if (actx.activate) {
 +            try {
 +                cacheProc.onKernalStopCaches(true);
 +
 +                cacheProc.stopCaches(true);
 +
 +                sharedCtx.affinity().removeAllCacheInfo();
 +
 +                if (!ctx.clientNode()) {
 +                    sharedCtx.database().onDeActivate(ctx);
 +
 +                    if (sharedCtx.pageStore() != null)
 +                        sharedCtx.pageStore().onDeActivate(ctx);
 +
 +                    if (sharedCtx.wal() != null)
 +                        sharedCtx.wal().onDeActivate(ctx);
 +                }
 +            }
 +            catch (Exception e) {
 +                for (Map.Entry<UUID, Exception> entry : exs.entrySet())
 +                    e.addSuppressed(entry.getValue());
 +
 +                log.error("Fail while revert activation request changes", e);
 +            }
 +        }
 +        else {
 +            //todo revert change if deactivate request fail
 +        }
 +
 +        globalState = actx.activate ? INACTIVE : ACTIVE;
 +
 +        GridChangeGlobalStateFuture af = cgsLocFut.get();
 +
 +        if (af != null && af.requestId.equals(actx.requestId)) {
 +            IgniteCheckedException e = new IgniteCheckedException("see suppressed");
 +
 +            for (Map.Entry<UUID, Exception> entry : exs.entrySet())
 +                e.addSuppressed(entry.getValue());
 +
 +            af.onDone(e);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]");
 +
 +        Collection<CacheConfiguration> cfgs = new ArrayList<>();
 +
 +        for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) {
 +            if (req.startCacheConfiguration() != null)
 +                cfgs.add(req.startCacheConfiguration());
 +        }
 +
 +        try {
 +            if (!client) {
 +                sharedCtx.database().lock();
 +
 +                IgnitePageStoreManager pageStore = sharedCtx.pageStore();
 +
 +                if (pageStore != null)
 +                    pageStore.onActivate(ctx);
 +
 +                if (sharedCtx.wal() != null)
 +                    sharedCtx.wal().onActivate(ctx);
 +
 +                sharedCtx.database().initDataBase();
 +
 +                for (CacheConfiguration cfg : cfgs) {
 +                    if (CU.isSystemCache(cfg.getName()))
 +                        if (pageStore != null)
 +                            pageStore.initializeForCache(cfg);
 +                }
 +
 +                for (CacheConfiguration cfg : cfgs) {
 +                    if (!CU.isSystemCache(cfg.getName()))
 +                        if (pageStore != null)
 +                            pageStore.initializeForCache(cfg);
 +                }
 +
 +                sharedCtx.database().onActivate(ctx);
 +            }
 +
 +            if (log.isInfoEnabled())
 +                log.info("Success activate wal, dataBase, pageStore [nodeId="
 +                    + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +            return null;
 +        }
 +        catch (Exception e) {
 +            log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]", e);
 +
 +            if (!ctx.clientNode())
 +                sharedCtx.database().unLock();
 +
 +            return e;
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" +
 +                client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +        try {
 +            ctx.dataStructures().onDeActivate(ctx);
 +
 +            ctx.service().onDeActivate(ctx);
 +
 +            if (log.isInfoEnabled())
 +                log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" +
 +                    client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +            return null;
 +        }
 +        catch (Exception e) {
 +            log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]", e);
 +
 +            return e;
 +        }
 +        finally {
 +            if (!client)
 +                sharedCtx.database().unLock();
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
 +        IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
 +            @Override public void run() {
 +                boolean client = ctx.clientNode();
 +
 +                Exception e = null;
 +
 +                try {
-                     ctx.marshallerContext().onMarshallerCacheStarted(ctx);
- 
 +                    if (!ctx.config().isDaemon())
 +                        ctx.cacheObjects().onUtilityCacheStarted();
 +
 +                    ctx.service().onUtilityCacheStarted();
 +
 +                    ctx.service().onActivate(ctx);
 +
 +                    ctx.dataStructures().onActivate(ctx);
 +
 +                    if (log.isInfoEnabled())
 +                        log.info("Success final activate [nodeId="
 +                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +                }
 +                catch (Exception ex) {
 +                    e = ex;
 +
 +                    log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                        ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex);
 +                }
 +                finally {
 +                    globalState = ACTIVE;
 +
 +                    sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
 +
 +                    GridClusterStateProcessor.this.lastCgsCtx = null;
 +                }
 +            }
 +        });
 +
 +        cgsCtx.setAsyncActivateFut(asyncActivateFut);
 +    }
 +
 +    /**
 +     *
 +     */
 +    public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Success final deactivate [nodeId="
 +                + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +        Exception ex = null;
 +
 +        try {
 +            if (!client) {
 +                sharedCtx.database().onDeActivate(ctx);
 +
 +                if (sharedCtx.pageStore() != null)
 +                    sharedCtx.pageStore().onDeActivate(ctx);
 +
 +                if (sharedCtx.wal() != null)
 +                    sharedCtx.wal().onDeActivate(ctx);
 +
 +                sharedCtx.affinity().removeAllCacheInfo();
 +            }
 +        }
 +        catch (Exception e) {
 +            ex = e;
 +        }
 +        finally {
 +            globalState = INACTIVE;
 +        }
 +
 +        sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
 +
 +        this.lastCgsCtx = null;
 +    }
 +
 +    /**
 +     *
 +     */
 +    public void onExchangeDone() {
 +        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +        assert cgsCtx != null;
 +
 +        if (!cgsCtx.isFail()) {
 +            if (cgsCtx.activate)
 +                onFinalActivate(cgsCtx);
 +            else
 +                onFinalDeActivate(cgsCtx);
 +        }
 +        else
 +            lastCgsCtx = null;
 +    }
 +
 +    /**
 +     * @param initNodeId Initialize node id.
 +     * @param ex Exception.
 +     */
 +    private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
 +        assert requestId != null;
 +        assert initNodeId != null;
 +
 +        try {
 +            GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Send change global state response [nodeId=" + ctx.localNodeId() +
 +                    ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
 +
 +            if (ctx.localNodeId().equals(initNodeId))
 +                processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
 +            else
 +                sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
 +        }
 +        catch (IgniteCheckedException e) {
 +            log.error("Fail send change global state response to " + initNodeId, e);
 +        }
 +    }
 +
 +    /**
 +     * @param msg Message.
 +     */
 +    private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
 +        assert nodeId != null;
 +        assert msg != null;
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Received activation response [requestId=" + msg.getRequestId() +
 +                ", nodeId=" + nodeId + "]");
 +
 +        ClusterNode node = ctx.discovery().node(nodeId);
 +
 +        if (node == null) {
 +            U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
 +                msg.getRequestId() + ']');
 +
 +            return;
 +        }
 +
 +        UUID requestId = msg.getRequestId();
 +
 +        final GridChangeGlobalStateFuture fut = cgsLocFut.get();
 +
 +        if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
 +            fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
 +                @Override public void apply(IgniteInternalFuture<?> f) {
 +                    fut.onResponse(nodeId, msg);
 +                }
 +            });
 +        }
 +    }
 +
 +
 +
 +    /**
 +     * @param activate Activate.
 +     */
 +    private String prettyStr(boolean activate) {
 +        return activate ? "activate" : "deactivate";
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(GridClusterStateProcessor.class, this);
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class GridChangeGlobalStateFuture extends GridFutureAdapter {
 +        /** Request id. */
 +        @GridToStringInclude
 +        private final UUID requestId;
 +
 +        /** Activate. */
 +        private final boolean activate;
 +
 +        /** Nodes. */
 +        @GridToStringInclude
 +        private final Set<UUID> remaining = new HashSet<>();
 +
 +        /** Responses. */
 +        @GridToStringInclude
 +        private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>();
 +
 +        /** Context. */
 +        @GridToStringExclude
 +        private final GridKernalContext ctx;
 +
 +        /** */
 +        @GridToStringExclude
 +        private final Object mux = new Object();
 +
 +        /** */
 +        @GridToStringInclude
 +        private final GridFutureAdapter initFut = new GridFutureAdapter();
 +
 +        /** Grid logger. */
 +        @GridToStringExclude
 +        private final IgniteLogger log;
 +
 +        /**
 +         *
 +         */
 +        public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
 +            this.requestId = requestId;
 +            this.activate = activate;
 +            this.ctx = ctx;
 +            this.log = ctx.log(getClass());
 +        }
 +
 +        /**
 +         * @param event Event.
 +         */
 +        public void onDiscoveryEvent(DiscoveryEvent event) {
 +            assert event != null;
 +
 +            if (isDone())
 +                return;
 +
 +            boolean allReceived = false;
 +
 +            synchronized (mux) {
 +                if (remaining.remove(event.eventNode().id()))
 +                    allReceived = remaining.isEmpty();
 +            }
 +
 +            if (allReceived)
 +                onAllReceived();
 +        }
 +
 +        /**
 +         *
 +         */
 +        public void setRemaining(AffinityTopologyVersion topVer) {
 +            Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
 +
 +            List<UUID> ids = new ArrayList<>(nodes.size());
 +
 +            for (ClusterNode n : nodes)
 +                ids.add(n.id());
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
 +                    ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
 +                    ", nodes=" + Arrays.toString(ids.toArray()) + "]");
 +
 +            synchronized (mux) {
 +                remaining.addAll(ids);
 +            }
 +
 +            initFut.onDone();
 +        }
 +
 +        /**
 +         * @param msg Activation message response.
 +         */
 +        public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
 +            assert msg != null;
 +
 +            if (isDone())
 +                return;
 +
 +            boolean allReceived = false;
 +
 +            synchronized (mux) {
 +                if (remaining.remove(nodeId))
 +                    allReceived = remaining.isEmpty();
 +
 +                resps.put(nodeId, msg);
 +            }
 +
 +            if (allReceived)
 +                onAllReceived();
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void onAllReceived() {
 +            Throwable e = new Throwable();
 +
 +            boolean fail = false;
 +
 +            for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) {
 +                GridChangeGlobalStateMessageResponse r = entry.getValue();
 +
 +                if (r.getError() != null) {
 +                    fail = true;
 +
 +                    e.addSuppressed(r.getError());
 +                }
 +            }
 +
 +            if (fail)
 +                onDone(e);
 +            else
 +                onDone();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
 +            ctx.state().cgsLocFut.set(null);
 +
 +            return super.onDone(res, err);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(GridChangeGlobalStateFuture.class, this);
 +        }
 +    }
 +
 +    /**
 +     *
 +     *
 +     */
 +    private static class ChangeGlobalStateContext {
 +        /** Request id. */
 +        private final UUID requestId;
 +
 +        /** Initiating node id. */
 +        private final UUID initiatingNodeId;
 +
 +        /** Batch requests. */
 +        private final DynamicCacheChangeBatch batch;
 +
 +        /** Activate. */
 +        private final boolean activate;
 +
 +        /** Topology version. */
 +        private AffinityTopologyVersion topVer;
 +
 +        /** Fail. */
 +        private boolean fail;
 +
 +        /** Async activate future. */
 +        private IgniteInternalFuture<?> asyncActivateFut;
 +
 +        /**
 +         *
 +         */
 +        public ChangeGlobalStateContext(
 +            UUID requestId,
 +            UUID initiatingNodeId,
 +            DynamicCacheChangeBatch batch,
 +            boolean activate
 +        ) {
 +            this.requestId = requestId;
 +            this.batch = batch;
 +            this.activate = activate;
 +            this.initiatingNodeId = initiatingNodeId;
 +        }
 +
 +        /**
 +         * @param topVer Topology version.
 +         */
 +        public void topologyVersion(AffinityTopologyVersion topVer) {
 +            this.topVer = topVer;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void setFail() {
 +            fail = true;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private boolean isFail() {
 +            return fail;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public IgniteInternalFuture<?> getAsyncActivateFut() {
 +            return asyncActivateFut;
 +        }
 +
 +        /**
 +         * @param asyncActivateFut Async activate future.
 +         */
 +        public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
 +            this.asyncActivateFut = asyncActivateFut;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(ChangeGlobalStateContext.class, this);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Activation. */
 +        private final boolean activation;
 +
 +        /** Ignite. */
 +        @IgniteInstanceResource
 +        private Ignite ignite;
 +
 +        /**
 +         *
 +         */
 +        private ClientChangeGlobalStateComputeRequest(boolean activation) {
 +            this.activation = activation;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void run() {
 +            ignite.active(activation);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 5e15b46,3178e92..57ae7c6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@@ -52,10 -39,18 +49,8 @@@ import org.apache.ignite.lang.IgniteClo
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
  
 -import java.util.ArrayList;
 -import java.util.Collection;
 -import java.util.Collections;
 -import java.util.HashMap;
 -import java.util.List;
 -import java.util.ListIterator;
 -import java.util.Map;
 -import java.util.UUID;
 -import java.util.concurrent.ConcurrentMap;
 -
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
  import static org.apache.ignite.IgniteSystemProperties.getBoolean;
- import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
- import static org.apache.ignite.igfs.IgfsMode.PROXY;
  import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
  
  /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 0000000,66c19a0..6d59f89
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@@ -1,0 -1,363 +1,363 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.marshaller;
+ 
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
 -import java.util.concurrent.ExecutorService;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.events.DiscoveryEvent;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.GridKernalContext;
+ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+ import org.apache.ignite.internal.MarshallerContextImpl;
+ import org.apache.ignite.internal.managers.communication.GridIoManager;
+ import org.apache.ignite.internal.managers.communication.GridMessageListener;
+ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.processors.GridProcessorAdapter;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+ import org.apache.ignite.internal.util.future.GridFutureAdapter;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteFuture;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+ import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentHashMap8;
+ 
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+ import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+ 
+ /**
+  * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+  * events for exchanging marshalling mappings between nodes in grid.
+  *
+  * In particular it processes two flows:
+  * <ul>
+  *     <li>
+  *         Some node, server or client, wants to add new mapping for some class.
+  *         In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+  *     </li>
+  *     <li>
+  *         As discovery events are delivered to clients asynchronously,
+  *         client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+  *         In that situation client sends a {@link MissingMappingRequestMessage} request
+  *         and processor handles it as well as {@link MissingMappingResponseMessage} message.
+  *     </li>
+  * </ul>
+  */
+ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+     /** */
+     private final MarshallerContextImpl marshallerCtx;
+ 
+     /** */
+     private final GridClosureProcessor closProc;
+ 
+     /** */
+     private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>();
+ 
+     /** */
+     private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+             = new ConcurrentHashMap8<>();
+ 
+     /** */
+     private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+ 
+     /**
+      * @param ctx Kernal context.
+      */
+     public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+         super(ctx);
+ 
+         marshallerCtx = ctx.marshallerContext();
+ 
+         closProc = ctx.closure();
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override public void start() throws IgniteCheckedException {
++    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+         GridDiscoveryManager discoMgr = ctx.discovery();
+         GridIoManager ioMgr = ctx.io();
+ 
+         MarshallerMappingTransport transport = new MarshallerMappingTransport(
+                 ctx,
+                 mappingExchangeSyncMap,
+                 clientReqSyncMap
+         );
++
+         marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+ 
+         discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+ 
+         discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+ 
+         if (ctx.clientNode())
+             ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+         else
+             ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+ 
+         if (ctx.clientNode())
+             ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                 @Override public void onEvent(Event evt) {
+                     DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+ 
+                     if (!ctx.isStopping()) {
+                         for (ClientRequestFuture fut : clientReqSyncMap.values())
+                             fut.onNodeLeft(evt0.eventNode().id());
+                     }
+                 }
+             }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+     }
+ 
+     /**
+      * Adds a listener to be notified when mapping changes.
+      *
+      * @param mappingUpdatedListener listener for mapping updated events.
+      */
+     public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
+         mappingUpdatedLsnrs.add(mappingUpdatedListener);
+     }
+ 
+     /**
+      * Gets an iterator over all current mappings.
+      *
+      * @return Iterator over current mappings.
+      */
+     public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() {
+         return marshallerCtx.currentMappings();
+     }
+ 
+     /**
+      *
+      */
+     private final class MissingMappingRequestListener implements GridMessageListener {
+         /** */
+         private final GridIoManager ioMgr;
+ 
+         /**
+          * @param ioMgr Io manager.
+          */
+         MissingMappingRequestListener(GridIoManager ioMgr) {
+             this.ioMgr = ioMgr;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             assert msg instanceof MissingMappingRequestMessage : msg;
+ 
+             MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+ 
+             byte platformId = msg0.platformId();
+             int typeId = msg0.typeId();
+ 
+             String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+ 
+             try {
+                 ioMgr.sendToGridTopic(
+                         nodeId,
+                         TOPIC_MAPPING_MARSH,
+                         new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+                         SYSTEM_POOL);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to send missing mapping response.", e);
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MissingMappingResponseListener implements GridMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             assert msg instanceof MissingMappingResponseMessage : msg;
+ 
+             MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+ 
+             byte platformId = msg0.platformId();
+             int typeId = msg0.typeId();
+             String resolvedClsName = msg0.className();
+ 
+             MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+ 
+             GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+ 
+             if (fut != null) {
+                 if (resolvedClsName != null) {
+                     marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+ 
+                     fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+                 }
+                 else
+                     fut.onDone(MappingExchangeResult.createFailureResult(
+                             new IgniteCheckedException(
+                                     "Failed to resolve mapping [platformId: "
+                                             + platformId
+                                             + ", typeId: "
+                                             + typeId + "]")));
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+         /** {@inheritDoc} */
+         @Override public void onCustomEvent(
+                 AffinityTopologyVersion topVer,
+                 ClusterNode snd,
+                 MappingProposedMessage msg
+         ) {
+             if (!ctx.isStopping()) {
+                 if (msg.duplicated())
+                     return;
+ 
+                 if (!msg.inConflict()) {
+                     MarshallerMappingItem item = msg.mappingItem();
+                     String conflictingName = marshallerCtx.onMappingProposed(item);
+ 
+                     if (conflictingName != null) {
+                         if (conflictingName.equals(item.className()))
+                             msg.markDuplicated();
+                         else
+                             msg.conflictingWithClass(conflictingName);
+                     }
+                 }
+                 else {
+                     UUID origNodeId = msg.origNodeId();
+ 
+                     if (origNodeId.equals(ctx.localNodeId())) {
+                         GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+ 
+                         assert fut != null: msg;
+ 
+                         fut.onDone(MappingExchangeResult.createFailureResult(
+                                 duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+                     }
+                 }
+             }
+         }
+ 
+         /**
+          * @param mappingItem Mapping item.
+          * @param conflictingClsName Conflicting class name.
+          */
+         private IgniteCheckedException duplicateMappingException(
+                 MarshallerMappingItem mappingItem,
+                 String conflictingClsName
+         ) {
+             return new IgniteCheckedException("Duplicate ID [platformId="
+                     + mappingItem.platformId()
+                     + ", typeId="
+                     + mappingItem.typeId()
+                     + ", oldCls="
+                     + conflictingClsName
+                     + ", newCls="
+                     + mappingItem.className() + "]");
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+         /** {@inheritDoc} */
+         @Override public void onCustomEvent(
+                 AffinityTopologyVersion topVer,
+                 ClusterNode snd,
+                 MappingAcceptedMessage msg
+         ) {
+             final MarshallerMappingItem item = msg.getMappingItem();
+             marshallerCtx.onMappingAccepted(item);
+ 
+             closProc.runLocalSafe(new Runnable() {
+                 @Override public void run() {
+                     for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
+                         lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
+                 }
+             });
+ 
+             GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+ 
+             if (fut != null)
+                 fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+         if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+             dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onGridDataReceived(GridDiscoveryData data) {
+         List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+ 
+         if (mappings != null) {
+             for (int i = 0; i < mappings.size(); i++) {
+                 Map<Integer, MappedName> map;
+ 
+                 if ((map = mappings.get(i)) != null)
+                     marshallerCtx.onMappingDataReceived((byte) i, map);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+         cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+                 ctx.cluster().clientReconnectFuture(),
+                 "Failed to propose or request mapping, client node disconnected.")));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onKernalStop(boolean cancel) {
+         marshallerCtx.onMarshallerProcessorStop();
+ 
+         cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+         return MARSHALLER_PROC;
+     }
+ 
+     /**
+      * @param res Response.
+      */
+     private void cancelFutures(MappingExchangeResult res) {
+         for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+             fut.onDone(res);
+ 
+         for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+             fut.onDone(res);
+     }
+ }