You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/15 10:45:12 UTC

[47/50] [abbrv] ignite git commit: Megre 2.0

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f05fdf7,cd4c55c..0309fa7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -403,7 -400,7 +404,8 @@@ public abstract class IgniteTxLocalAdap
          boolean skipVals,
          boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
      ) {
          assert cacheCtx.isLocal() : cacheCtx.name();
@@@ -431,9 -430,11 +435,9 @@@
                          continue;
  
                      try {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              this,
 -                            /*readSwap*/true,
 -                            /*unmarshal*/true,
                              /*update-metrics*/!skipVals,
                              /*event*/!skipVals,
                              CU.subjectId(this, cctx),
@@@ -1246,9 -1222,11 +1254,9 @@@
                                      F.first(txEntry.entryProcessors()) : null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+                                 EntryGetResult res = txEntry.cached().innerGetVersioned(
                                      null,
                                      this,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
                                      /*update-metrics*/true,
                                      /*event*/!skipVals,
                                      CU.subjectId(this, cctx),
@@@ -1499,7 -1486,7 +1511,8 @@@
                  skipVals,
                  needReadVer,
                  !deserializeBinary,
 +                recovery,
+                 expiryPlc,
                  new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                      @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
                          if (isRollbackOnly()) {
@@@ -1675,10 -1665,12 +1692,10 @@@
                                              F.first(txEntry.entryProcessors()) : null;
  
                                      if (needVer) {
-                                         T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+                                         EntryGetResult res = cached.innerGetVersioned(
                                              null,
                                              IgniteTxLocalAdapter.this,
 -                                            /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
 -                                            /*unmarshal*/true,
 -                                            /*update-metrics*/true,
 +                                            /**update-metrics*/true,
                                              /*event*/!skipVals,
                                              CU.subjectId(IgniteTxLocalAdapter.this, cctx),
                                              transformClo,
@@@ -2041,7 -2035,7 +2061,8 @@@
                      /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                      retval,
                      keepBinary,
 -                    expiryPlc);
++                    expiryPlc,
 +                    recovery);
              }
  
              return new GridFinishedFuture<>();
@@@ -2212,7 -2205,7 +2233,8 @@@
                      /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                      retval,
                      keepBinary,
 -                    expiryPlc);
++                    expiryPlc,
 +                    recovery);
              }
  
              return new GridFinishedFuture<>();
@@@ -2232,8 -2225,7 +2254,9 @@@
       * @param hasFilters {@code True} if filters not empty.
       * @param readThrough Read through flag.
       * @param retval Return value flag.
 +     * @param keepBinary Keep binary flag.
+      * @param expiryPlc Expiry policy.
 +     * @param recovery Recovery flag.
       * @return Load future.
       */
      private IgniteInternalFuture<Void> loadMissing(
@@@ -2248,8 -2240,7 +2271,9 @@@
          final boolean readThrough,
          final boolean retval,
          final boolean keepBinary,
 -        final ExpiryPolicy expiryPlc) {
++        final ExpiryPolicy expiryPlc,
 +        final boolean recovery
 +    ) {
          GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
              new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                  @Override public void apply(KeyCacheObject key,
@@@ -2323,7 -2314,7 +2347,8 @@@
              /*skipVals*/singleRmv,
              needReadVer,
              keepBinary,
 +            recovery,
+             expiryPlc,
              c);
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index d6b09e4,f5687a0..cf1e7e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -192,6 -194,6 +194,7 @@@ public interface IgniteTxLocalEx extend
          boolean skipVals,
          boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b,d1c8b2d..a32b9d8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@@ -121,8 -139,27 +139,27 @@@ public class ClusterProcessor extends G
          }
      }
  
+ 
+     /**
+      * @param vals collection to seek through.
+      */
+     private Boolean findLastFlag(Collection<Serializable> vals) {
+         Boolean flag = null;
+ 
+         for (Serializable ser : vals) {
+             if (ser != null) {
+                 Map<String, Object> map = (Map<String, Object>) ser;
+ 
+                 if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+                     flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+             }
+         }
+ 
+         return flag;
+     }
+ 
      /** {@inheritDoc} */
 -    @Override public void onKernalStart() throws IgniteCheckedException {
 +    @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
          if (notifyEnabled.get()) {
              try {
                  verChecker = new GridUpdateNotifier(ctx.gridName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 87d54a1,0000000..247d469
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@@ -1,947 -1,0 +1,955 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License. You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cluster;
 +
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicReference;
 +import org.apache.ignite.Ignite;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteCompute;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.configuration.CacheConfiguration;
 +import org.apache.ignite.events.DiscoveryEvent;
 +import org.apache.ignite.events.Event;
 +import org.apache.ignite.internal.GridKernalContext;
 +import org.apache.ignite.internal.IgniteInternalFuture;
 +import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 +import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 +import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 +import org.apache.ignite.internal.processors.GridProcessorAdapter;
 +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.cache.ClusterState;
 +import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 +import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 +import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
 +import org.apache.ignite.internal.util.future.GridFinishedFuture;
 +import org.apache.ignite.internal.util.future.GridFutureAdapter;
 +import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 +import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 +import org.apache.ignite.internal.util.typedef.CI1;
 +import org.apache.ignite.internal.util.typedef.CI2;
 +import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.internal.util.typedef.internal.CU;
 +import org.apache.ignite.internal.util.typedef.internal.S;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteFuture;
 +import org.apache.ignite.lang.IgniteRunnable;
 +import org.apache.ignite.resources.IgniteInstanceResource;
++import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 +import org.jetbrains.annotations.Nullable;
 +
 +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
 +import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
 +
 +/**
 + *
 + */
 +public class GridClusterStateProcessor extends GridProcessorAdapter {
 +    /** Global status. */
 +    private volatile ClusterState globalState;
 +
 +    /** Action context. */
 +    private volatile ChangeGlobalStateContext lastCgsCtx;
 +
 +    /** Local action future. */
 +    private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
 +
 +    /** Process. */
 +    @GridToStringExclude
 +    private GridCacheProcessor cacheProc;
 +
 +    /** Shared context. */
 +    @GridToStringExclude
 +    private GridCacheSharedContext<?, ?> sharedCtx;
 +
 +    //todo may be add init latch
 +
 +    /** Listener. */
 +    private final GridLocalEventListener lsr = new GridLocalEventListener() {
 +        @Override public void onEvent(Event evt) {
 +            assert evt != null;
 +
 +            final DiscoveryEvent e = (DiscoveryEvent)evt;
 +
 +            assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
 +
 +            final GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +            if (f != null)
 +                f.initFut.listen(new CI1<IgniteInternalFuture>() {
 +                    @Override public void apply(IgniteInternalFuture fut) {
 +                        f.onDiscoveryEvent(e);
 +                    }
 +                });
 +        }
 +    };
 +
 +    /**
 +     * @param ctx Kernal context.
 +     */
 +    public GridClusterStateProcessor(GridKernalContext ctx) {
 +        super(ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
 +        super.start(activeOnStart);
 +
 +        globalState = activeOnStart ? ACTIVE : INACTIVE;
 +        cacheProc = ctx.cache();
 +        sharedCtx = cacheProc.context();
 +
 +        sharedCtx.io().addHandler(0,
 +            GridChangeGlobalStateMessageResponse.class,
 +            new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
 +                @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
 +                    processChangeGlobalStateResponse(nodeId, msg);
 +                }
 +            });
 +
 +        ctx.discovery().setCustomEventListener(
 +            ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
 +                @Override public void onCustomEvent(
 +                    AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
 +                    assert topVer != null;
 +                    assert snd != null;
 +                    assert msg != null;
 +
 +                    boolean activate = msg.activate();
 +
 +                    ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +                    if (actx != null && globalState == TRANSITION) {
 +                        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Concurrent " + prettyStr(activate) + " [id=" +
 +                                ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
 +
 +                        if (f != null && f.requestId.equals(msg.requestId()))
 +                            f.onDone(new IgniteCheckedException(
 +                                "Concurrent change state, now in progress=" + (activate)
 +                                    + ", initiatingNodeId=" + actx.initiatingNodeId
 +                                    + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
 +                            ));
 +
 +                        msg.concurrentChangeState();
 +                    }
 +                    else {
 +                        if (log.isInfoEnabled())
 +                            log.info("Create " + prettyStr(activate) + " context [id=" +
 +                                ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
 +                                msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
 +
 +                        lastCgsCtx = new ChangeGlobalStateContext(
 +                            msg.requestId(),
 +                            msg.initiatorNodeId(),
 +                            msg.getDynamicCacheChangeBatch(),
 +                            msg.activate());
 +
 +                        globalState = TRANSITION;
 +                    }
 +                }
 +            });
 +
 +        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 +        super.stop(cancel);
 +
 +        sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class);
 +        ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 +
 +        IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
 +            "Node is stopping: " + ctx.gridName());
 +
 +        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +        if (f != null)
 +            f.onDone(stopErr);
 +
 +        cgsLocFut.set(null);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
 +        return DiscoveryDataExchangeType.STATE_PROC;
 +    }
 +
 +    /** {@inheritDoc} */
-     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-         return globalState;
++    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
++        dataBag.addJoiningNodeData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
 +    }
 +
 +    /** {@inheritDoc} */
-     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
-         if (ctx.localNodeId().equals(joiningNodeId))
-             globalState = (ClusterState)data;
++    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
++        dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
++    }
++
++    /** {@inheritDoc} */
++    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
++        globalState = (ClusterState)data.commonData();
++    }
++
++    /** {@inheritDoc} */
++    @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
++        // No-op.
 +    }
 +
 +    /**
 +     *
 +     */
 +    public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
 +        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
 +            throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction.");
 +
 +        if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
 +            return new GridFinishedFuture<>();
 +
 +        final UUID requestId = UUID.randomUUID();
 +
 +        final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
 +
 +        if (!cgsLocFut.compareAndSet(null, cgsFut)) {
 +            GridChangeGlobalStateFuture locF = cgsLocFut.get();
 +
 +            if (locF.activate == activate)
 +                return locF;
 +            else
 +                return new GridFinishedFuture<>(new IgniteException(
 +                    "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate)));
 +        }
 +
 +        try {
 +            if (ctx.clientNode()) {
 +                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 +
 +                IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers())
 +                    .compute().withAsync();
 +
 +                if (log.isInfoEnabled())
 +                    log.info("Send " + prettyStr(activate) + " request from client node [id=" +
 +                        ctx.localNodeId() + " topVer=" + topVer + " ]");
 +
 +                comp.run(new ClientChangeGlobalStateComputeRequest(activate));
 +
 +                comp.future().listen(new CI1<IgniteFuture>() {
 +                    @Override public void apply(IgniteFuture fut) {
 +                        try {
 +                            fut.get();
 +
 +                            cgsFut.onDone();
 +                        }
 +                        catch (Exception e) {
 +                            cgsFut.onDone(e);
 +                        }
 +                    }
 +                });
 +            }
 +            else {
 +                List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 +
 +                DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
 +                    requestId, null, ctx.localNodeId());
 +
 +                changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
 +
 +                reqs.add(changeGlobalStateReq);
 +
 +                reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests());
 +
 +                ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
 +                    requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
 +
 +                try {
 +                    ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
 +
 +                    if (ctx.isStopping())
 +                        cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
 +                            "node is stopping."));
 +                }
 +                catch (IgniteCheckedException e) {
 +                    log.error("Fail create or send change global state request." + cgsFut, e);
 +
 +                    cgsFut.onDone(e);
 +                }
 +            }
 +        }
 +        catch (IgniteCheckedException e) {
 +            log.error("Fail create or send change global state request." + cgsFut, e);
 +
 +            cgsFut.onDone(e);
 +        }
 +
 +        return cgsFut;
 +    }
 +
 +    /**
 +     *
 +     */
 +    public boolean active() {
 +        ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +        if (actx != null && !actx.activate && globalState == TRANSITION)
 +            return true;
 +
 +        if (actx != null && actx.activate && globalState == TRANSITION)
 +            return false;
 +
 +        return globalState == ACTIVE;
 +    }
 +
 +    /**
 +     * @param reqs Requests.
 +     */
 +    public boolean changeGlobalState(
 +        Collection<DynamicCacheChangeRequest> reqs,
 +        AffinityTopologyVersion topVer
 +    ) {
 +        assert !F.isEmpty(reqs);
 +        assert topVer != null;
 +
 +        for (DynamicCacheChangeRequest req : reqs)
 +            if (req.globalStateChange()) {
 +                ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +                assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
 +
 +                cgsCtx.topologyVersion(topVer);
 +
 +                return true;
 +            }
 +
 +
 +        return false;
 +    }
 +
 +    /**
 +     * Invoke from exchange future.
 +     */
 +    public Exception onChangeGlobalState() {
 +        GridChangeGlobalStateFuture f = cgsLocFut.get();
 +
 +        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +        assert cgsCtx != null;
 +
 +        if (f != null)
 +            f.setRemaining(cgsCtx.topVer);
 +
 +        return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
 +    }
 +
 +    /**
 +     * @param exs Exs.
 +     */
 +    public void onFullResponseMessage(Map<UUID, Exception> exs) {
 +        assert !F.isEmpty(exs);
 +
 +        ChangeGlobalStateContext actx = lastCgsCtx;
 +
 +        actx.setFail();
 +
 +        // revert change if activation request fail
 +        if (actx.activate) {
 +            try {
 +                cacheProc.onKernalStopCaches(true);
 +
 +                cacheProc.stopCaches(true);
 +
 +                sharedCtx.affinity().removeAllCacheInfo();
 +
 +                if (!ctx.clientNode()) {
 +                    sharedCtx.database().onDeActivate(ctx);
 +
 +                    if (sharedCtx.pageStore() != null)
 +                        sharedCtx.pageStore().onDeActivate(ctx);
 +
 +                    if (sharedCtx.wal() != null)
 +                        sharedCtx.wal().onDeActivate(ctx);
 +                }
 +            }
 +            catch (Exception e) {
 +                for (Map.Entry<UUID, Exception> entry : exs.entrySet())
 +                    e.addSuppressed(entry.getValue());
 +
 +                log.error("Fail while revert activation request changes", e);
 +            }
 +        }
 +        else {
 +            //todo revert change if deactivate request fail
 +        }
 +
 +        globalState = actx.activate ? INACTIVE : ACTIVE;
 +
 +        GridChangeGlobalStateFuture af = cgsLocFut.get();
 +
 +        if (af != null && af.requestId.equals(actx.requestId)) {
 +            IgniteCheckedException e = new IgniteCheckedException("see suppressed");
 +
 +            for (Map.Entry<UUID, Exception> entry : exs.entrySet())
 +                e.addSuppressed(entry.getValue());
 +
 +            af.onDone(e);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]");
 +
 +        Collection<CacheConfiguration> cfgs = new ArrayList<>();
 +
 +        for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) {
 +            if (req.startCacheConfiguration() != null)
 +                cfgs.add(req.startCacheConfiguration());
 +        }
 +
 +        try {
 +            if (!client) {
 +                sharedCtx.database().lock();
 +
 +                IgnitePageStoreManager pageStore = sharedCtx.pageStore();
 +
 +                if (pageStore != null)
 +                    pageStore.onActivate(ctx);
 +
 +                if (sharedCtx.wal() != null)
 +                    sharedCtx.wal().onActivate(ctx);
 +
 +                sharedCtx.database().initDataBase();
 +
 +                for (CacheConfiguration cfg : cfgs) {
 +                    if (CU.isSystemCache(cfg.getName()))
 +                        if (pageStore != null)
 +                            pageStore.initializeForCache(cfg);
 +                }
 +
 +                for (CacheConfiguration cfg : cfgs) {
 +                    if (!CU.isSystemCache(cfg.getName()))
 +                        if (pageStore != null)
 +                            pageStore.initializeForCache(cfg);
 +                }
 +
 +                sharedCtx.database().onActivate(ctx);
 +            }
 +
 +            if (log.isInfoEnabled())
 +                log.info("Success activate wal, dataBase, pageStore [nodeId="
 +                    + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +            return null;
 +        }
 +        catch (Exception e) {
 +            log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]", e);
 +
 +            if (!ctx.clientNode())
 +                sharedCtx.database().unLock();
 +
 +            return e;
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" +
 +                client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +        try {
 +            ctx.dataStructures().onDeActivate(ctx);
 +
 +            ctx.service().onDeActivate(ctx);
 +
 +            if (log.isInfoEnabled())
 +                log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" +
 +                    client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +            return null;
 +        }
 +        catch (Exception e) {
 +            log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                ", topVer=" + cgsCtx.topVer + "]", e);
 +
 +            return e;
 +        }
 +        finally {
 +            if (!client)
 +                sharedCtx.database().unLock();
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
 +        IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
 +            @Override public void run() {
 +                boolean client = ctx.clientNode();
 +
 +                Exception e = null;
 +
 +                try {
-                     ctx.marshallerContext().onMarshallerCacheStarted(ctx);
- 
 +                    if (!ctx.config().isDaemon())
 +                        ctx.cacheObjects().onUtilityCacheStarted();
 +
 +                    ctx.service().onUtilityCacheStarted();
 +
 +                    ctx.service().onActivate(ctx);
 +
 +                    ctx.dataStructures().onActivate(ctx);
 +
 +                    if (log.isInfoEnabled())
 +                        log.info("Success final activate [nodeId="
 +                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +                }
 +                catch (Exception ex) {
 +                    e = ex;
 +
 +                    log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client +
 +                        ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex);
 +                }
 +                finally {
 +                    globalState = ACTIVE;
 +
 +                    sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
 +
 +                    GridClusterStateProcessor.this.lastCgsCtx = null;
 +                }
 +            }
 +        });
 +
 +        cgsCtx.setAsyncActivateFut(asyncActivateFut);
 +    }
 +
 +    /**
 +     *
 +     */
 +    public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
 +        final boolean client = ctx.clientNode();
 +
 +        if (log.isInfoEnabled())
 +            log.info("Success final deactivate [nodeId="
 +                + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
 +
 +        Exception ex = null;
 +
 +        try {
 +            if (!client) {
 +                sharedCtx.database().onDeActivate(ctx);
 +
 +                if (sharedCtx.pageStore() != null)
 +                    sharedCtx.pageStore().onDeActivate(ctx);
 +
 +                if (sharedCtx.wal() != null)
 +                    sharedCtx.wal().onDeActivate(ctx);
 +
 +                sharedCtx.affinity().removeAllCacheInfo();
 +            }
 +        }
 +        catch (Exception e) {
 +            ex = e;
 +        }
 +        finally {
 +            globalState = INACTIVE;
 +        }
 +
 +        sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
 +
 +        this.lastCgsCtx = null;
 +    }
 +
 +    /**
 +     *
 +     */
 +    public void onExchangeDone() {
 +        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 +
 +        assert cgsCtx != null;
 +
 +        if (!cgsCtx.isFail()) {
 +            if (cgsCtx.activate)
 +                onFinalActivate(cgsCtx);
 +            else
 +                onFinalDeActivate(cgsCtx);
 +        }
 +        else
 +            lastCgsCtx = null;
 +    }
 +
 +    /**
 +     * @param initNodeId Initialize node id.
 +     * @param ex Exception.
 +     */
 +    private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
 +        assert requestId != null;
 +        assert initNodeId != null;
 +
 +        try {
 +            GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Send change global state response [nodeId=" + ctx.localNodeId() +
 +                    ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
 +
 +            if (ctx.localNodeId().equals(initNodeId))
 +                processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
 +            else
 +                sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
 +        }
 +        catch (IgniteCheckedException e) {
 +            log.error("Fail send change global state response to " + initNodeId, e);
 +        }
 +    }
 +
 +    /**
 +     * @param msg Message.
 +     */
 +    private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
 +        assert nodeId != null;
 +        assert msg != null;
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Received activation response [requestId=" + msg.getRequestId() +
 +                ", nodeId=" + nodeId + "]");
 +
 +        ClusterNode node = ctx.discovery().node(nodeId);
 +
 +        if (node == null) {
 +            U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
 +                msg.getRequestId() + ']');
 +
 +            return;
 +        }
 +
 +        UUID requestId = msg.getRequestId();
 +
 +        final GridChangeGlobalStateFuture fut = cgsLocFut.get();
 +
 +        if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
 +            fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
 +                @Override public void apply(IgniteInternalFuture<?> f) {
 +                    fut.onResponse(nodeId, msg);
 +                }
 +            });
 +        }
 +    }
 +
 +
 +
 +    /**
 +     * @param activate Activate.
 +     */
 +    private String prettyStr(boolean activate) {
 +        return activate ? "activate" : "deactivate";
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(GridClusterStateProcessor.class, this);
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class GridChangeGlobalStateFuture extends GridFutureAdapter {
 +        /** Request id. */
 +        @GridToStringInclude
 +        private final UUID requestId;
 +
 +        /** Activate. */
 +        private final boolean activate;
 +
 +        /** Nodes. */
 +        @GridToStringInclude
 +        private final Set<UUID> remaining = new HashSet<>();
 +
 +        /** Responses. */
 +        @GridToStringInclude
 +        private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>();
 +
 +        /** Context. */
 +        @GridToStringExclude
 +        private final GridKernalContext ctx;
 +
 +        /** */
 +        @GridToStringExclude
 +        private final Object mux = new Object();
 +
 +        /** */
 +        @GridToStringInclude
 +        private final GridFutureAdapter initFut = new GridFutureAdapter();
 +
 +        /** Grid logger. */
 +        @GridToStringExclude
 +        private final IgniteLogger log;
 +
 +        /**
 +         *
 +         */
 +        public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
 +            this.requestId = requestId;
 +            this.activate = activate;
 +            this.ctx = ctx;
 +            this.log = ctx.log(getClass());
 +        }
 +
 +        /**
 +         * @param event Event.
 +         */
 +        public void onDiscoveryEvent(DiscoveryEvent event) {
 +            assert event != null;
 +
 +            if (isDone())
 +                return;
 +
 +            boolean allReceived = false;
 +
 +            synchronized (mux) {
 +                if (remaining.remove(event.eventNode().id()))
 +                    allReceived = remaining.isEmpty();
 +            }
 +
 +            if (allReceived)
 +                onAllReceived();
 +        }
 +
 +        /**
 +         *
 +         */
 +        public void setRemaining(AffinityTopologyVersion topVer) {
 +            Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
 +
 +            List<UUID> ids = new ArrayList<>(nodes.size());
 +
 +            for (ClusterNode n : nodes)
 +                ids.add(n.id());
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
 +                    ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
 +                    ", nodes=" + Arrays.toString(ids.toArray()) + "]");
 +
 +            synchronized (mux) {
 +                remaining.addAll(ids);
 +            }
 +
 +            initFut.onDone();
 +        }
 +
 +        /**
 +         * @param msg Activation message response.
 +         */
 +        public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
 +            assert msg != null;
 +
 +            if (isDone())
 +                return;
 +
 +            boolean allReceived = false;
 +
 +            synchronized (mux) {
 +                if (remaining.remove(nodeId))
 +                    allReceived = remaining.isEmpty();
 +
 +                resps.put(nodeId, msg);
 +            }
 +
 +            if (allReceived)
 +                onAllReceived();
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void onAllReceived() {
 +            Throwable e = new Throwable();
 +
 +            boolean fail = false;
 +
 +            for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) {
 +                GridChangeGlobalStateMessageResponse r = entry.getValue();
 +
 +                if (r.getError() != null) {
 +                    fail = true;
 +
 +                    e.addSuppressed(r.getError());
 +                }
 +            }
 +
 +            if (fail)
 +                onDone(e);
 +            else
 +                onDone();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
 +            ctx.state().cgsLocFut.set(null);
 +
 +            return super.onDone(res, err);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(GridChangeGlobalStateFuture.class, this);
 +        }
 +    }
 +
 +    /**
 +     *
 +     *
 +     */
 +    private static class ChangeGlobalStateContext {
 +        /** Request id. */
 +        private final UUID requestId;
 +
 +        /** Initiating node id. */
 +        private final UUID initiatingNodeId;
 +
 +        /** Batch requests. */
 +        private final DynamicCacheChangeBatch batch;
 +
 +        /** Activate. */
 +        private final boolean activate;
 +
 +        /** Topology version. */
 +        private AffinityTopologyVersion topVer;
 +
 +        /** Fail. */
 +        private boolean fail;
 +
 +        /** Async activate future. */
 +        private IgniteInternalFuture<?> asyncActivateFut;
 +
 +        /**
 +         *
 +         */
 +        public ChangeGlobalStateContext(
 +            UUID requestId,
 +            UUID initiatingNodeId,
 +            DynamicCacheChangeBatch batch,
 +            boolean activate
 +        ) {
 +            this.requestId = requestId;
 +            this.batch = batch;
 +            this.activate = activate;
 +            this.initiatingNodeId = initiatingNodeId;
 +        }
 +
 +        /**
 +         * @param topVer Topology version.
 +         */
 +        public void topologyVersion(AffinityTopologyVersion topVer) {
 +            this.topVer = topVer;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void setFail() {
 +            fail = true;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private boolean isFail() {
 +            return fail;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public IgniteInternalFuture<?> getAsyncActivateFut() {
 +            return asyncActivateFut;
 +        }
 +
 +        /**
 +         * @param asyncActivateFut Async activate future.
 +         */
 +        public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
 +            this.asyncActivateFut = asyncActivateFut;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(ChangeGlobalStateContext.class, this);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Activation. */
 +        private final boolean activation;
 +
 +        /** Ignite. */
 +        @IgniteInstanceResource
 +        private Ignite ignite;
 +
 +        /**
 +         *
 +         */
 +        private ClientChangeGlobalStateComputeRequest(boolean activation) {
 +            this.activation = activation;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void run() {
 +            ignite.active(activation);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9012214,f97fc14..df2f7d9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1925,59 -1920,24 +1923,59 @@@ public class DataStreamerImpl<K, V> imp
  
              ExpiryPolicy plc = cctx.expiry();
  
 -            for (Entry<KeyCacheObject, CacheObject> e : entries) {
 -                try {
 -                    e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
 +            Collection<Integer> reservedParts = new HashSet<>();
 +            Collection<Integer> ignoredParts = new HashSet<>();
  
 -                    GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 +            try {
 +                for (Entry<KeyCacheObject, CacheObject> e : entries) {
 +                    cctx.shared().database().checkpointReadLock();
  
 -                    if (plc != null) {
 -                        ttl = CU.toTtl(plc.getExpiryForCreation());
 +                    try {
 +                        e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
  
 -                        if (ttl == CU.TTL_ZERO)
 -                            continue;
 -                        else if (ttl == CU.TTL_NOT_CHANGED)
 -                            ttl = 0;
 +                        if (!cctx.isLocal()) {
 +                            int p = cctx.affinity().partition(e.getKey());
  
 -                        expiryTime = CU.toExpireTime(ttl);
 -                    }
 +                            if (ignoredParts.contains(p))
 +                                continue;
 +
 +                            if (!reservedParts.contains(p)) {
 +                                GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, true);
 +
 +                                if (!part.reserve()) {
 +                                    ignoredParts.add(p);
 +
 +                                    continue;
 +                                }
 +                                else {
 +                                    // We must not allow to read from RENTING partitions.
 +                                    if (part.state() == GridDhtPartitionState.RENTING) {
 +                                        part.release();
 +
 +                                        ignoredParts.add(p);
 +
 +                                        continue;
 +                                    }
 +
 +                                    reservedParts.add(p);
 +                                }
 +                            }
 +                        }
 +
 +                        GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 +
 +                        if (plc != null) {
 +                            ttl = CU.toTtl(plc.getExpiryForCreation());
 +
 +                            if (ttl == CU.TTL_ZERO)
 +                                continue;
 +                            else if (ttl == CU.TTL_NOT_CHANGED)
 +                                ttl = 0;
 +
 +                            expiryTime = CU.toExpireTime(ttl);
 +                        }
  
-                     boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer);
+                     boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer);
  
                      entry.initialValue(e.getValue(),
                          ver,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 0000000,fdea869..3d2f86e
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@@ -1,0 -1,363 +1,363 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.marshaller;
+ 
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutorService;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.events.DiscoveryEvent;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.GridKernalContext;
+ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+ import org.apache.ignite.internal.MarshallerContextImpl;
+ import org.apache.ignite.internal.managers.communication.GridIoManager;
+ import org.apache.ignite.internal.managers.communication.GridMessageListener;
+ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.processors.GridProcessorAdapter;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+ import org.apache.ignite.internal.util.future.GridFutureAdapter;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteFuture;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+ import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentHashMap8;
+ 
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+ import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+ 
+ /**
+  * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+  * events for exchanging marshalling mappings between nodes in grid.
+  *
+  * In particular it processes two flows:
+  * <ul>
+  *     <li>
+  *         Some node, server or client, wants to add new mapping for some class.
+  *         In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+  *     </li>
+  *     <li>
+  *         As discovery events are delivered to clients asynchronously,
+  *         client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+  *         In that situation client sends a {@link MissingMappingRequestMessage} request
+  *         and processor handles it as well as {@link MissingMappingResponseMessage} message.
+  *     </li>
+  * </ul>
+  */
+ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+     /** */
+     private final MarshallerContextImpl marshallerCtx;
+ 
+     /** */
+     private final GridClosureProcessor closProc;
+ 
+     /** */
+     private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>();
+ 
+     /** */
+     private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+             = new ConcurrentHashMap8<>();
+ 
+     /** */
+     private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+ 
+     /**
+      * @param ctx Kernal context.
+      */
+     public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+         super(ctx);
+ 
+         marshallerCtx = ctx.marshallerContext();
+ 
+         closProc = ctx.closure();
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override public void start() throws IgniteCheckedException {
++    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+         GridDiscoveryManager discoMgr = ctx.discovery();
+         GridIoManager ioMgr = ctx.io();
+ 
+         MarshallerMappingTransport transport = new MarshallerMappingTransport(
+                 ctx,
+                 mappingExchangeSyncMap,
+                 clientReqSyncMap
+         );
+         marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+ 
+         discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+ 
+         discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+ 
+         if (ctx.clientNode())
+             ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+         else
+             ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+ 
+         if (ctx.clientNode())
+             ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                 @Override public void onEvent(Event evt) {
+                     DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+ 
+                     if (!ctx.isStopping()) {
+                         for (ClientRequestFuture fut : clientReqSyncMap.values())
+                             fut.onNodeLeft(evt0.eventNode().id());
+                     }
+                 }
+             }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+     }
+ 
+     /**
+      * Adds a listener to be notified when mapping changes.
+      *
+      * @param mappingUpdatedListener listener for mapping updated events.
+      */
+     public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
+         mappingUpdatedLsnrs.add(mappingUpdatedListener);
+     }
+ 
+     /**
+      * Gets an iterator over all current mappings.
+      *
+      * @return Iterator over current mappings.
+      */
+     public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() {
+         return marshallerCtx.currentMappings();
+     }
+ 
+     /**
+      *
+      */
+     private final class MissingMappingRequestListener implements GridMessageListener {
+         /** */
+         private final GridIoManager ioMgr;
+ 
+         /**
+          * @param ioMgr Io manager.
+          */
+         MissingMappingRequestListener(GridIoManager ioMgr) {
+             this.ioMgr = ioMgr;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             assert msg instanceof MissingMappingRequestMessage : msg;
+ 
+             MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+ 
+             byte platformId = msg0.platformId();
+             int typeId = msg0.typeId();
+ 
+             String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+ 
+             try {
+                 ioMgr.send(
+                         nodeId,
+                         TOPIC_MAPPING_MARSH,
+                         new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+                         SYSTEM_POOL);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to send missing mapping response.", e);
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MissingMappingResponseListener implements GridMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             assert msg instanceof MissingMappingResponseMessage : msg;
+ 
+             MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+ 
+             byte platformId = msg0.platformId();
+             int typeId = msg0.typeId();
+             String resolvedClsName = msg0.className();
+ 
+             MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+ 
+             GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+ 
+             if (fut != null) {
+                 if (resolvedClsName != null) {
+                     marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+ 
+                     fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+                 }
+                 else
+                     fut.onDone(MappingExchangeResult.createFailureResult(
+                             new IgniteCheckedException(
+                                     "Failed to resolve mapping [platformId: "
+                                             + platformId
+                                             + ", typeId: "
+                                             + typeId + "]")));
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+         /** {@inheritDoc} */
+         @Override public void onCustomEvent(
+                 AffinityTopologyVersion topVer,
+                 ClusterNode snd,
+                 MappingProposedMessage msg
+         ) {
+             if (!ctx.isStopping()) {
+                 if (msg.duplicated())
+                     return;
+ 
+                 if (!msg.inConflict()) {
+                     MarshallerMappingItem item = msg.mappingItem();
+                     String conflictingName = marshallerCtx.onMappingProposed(item);
+ 
+                     if (conflictingName != null) {
+                         if (conflictingName.equals(item.className()))
+                             msg.markDuplicated();
+                         else
+                             msg.conflictingWithClass(conflictingName);
+                     }
+                 }
+                 else {
+                     UUID origNodeId = msg.origNodeId();
+ 
+                     if (origNodeId.equals(ctx.localNodeId())) {
+                         GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+ 
+                         assert fut != null: msg;
+ 
+                         fut.onDone(MappingExchangeResult.createFailureResult(
+                                 duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+                     }
+                 }
+             }
+         }
+ 
+         /**
+          * @param mappingItem Mapping item.
+          * @param conflictingClsName Conflicting class name.
+          */
+         private IgniteCheckedException duplicateMappingException(
+                 MarshallerMappingItem mappingItem,
+                 String conflictingClsName
+         ) {
+             return new IgniteCheckedException("Duplicate ID [platformId="
+                     + mappingItem.platformId()
+                     + ", typeId="
+                     + mappingItem.typeId()
+                     + ", oldCls="
+                     + conflictingClsName
+                     + ", newCls="
+                     + mappingItem.className() + "]");
+         }
+     }
+ 
+     /**
+      *
+      */
+     private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+         /** {@inheritDoc} */
+         @Override public void onCustomEvent(
+                 AffinityTopologyVersion topVer,
+                 ClusterNode snd,
+                 MappingAcceptedMessage msg
+         ) {
+             final MarshallerMappingItem item = msg.getMappingItem();
+             marshallerCtx.onMappingAccepted(item);
+ 
+             closProc.runLocalSafe(new Runnable() {
+                 @Override public void run() {
+                     for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
+                         lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
+                 }
+             });
+ 
+             GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+ 
+             if (fut != null)
+                 fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+         if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+             dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onGridDataReceived(GridDiscoveryData data) {
+         List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+ 
+         if (mappings != null) {
+             for (int i = 0; i < mappings.size(); i++) {
+                 Map<Integer, MappedName> map;
+ 
+                 if ((map = mappings.get(i)) != null)
+                     marshallerCtx.onMappingDataReceived((byte) i, map);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+         cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+                 ctx.cluster().clientReconnectFuture(),
+                 "Failed to propose or request mapping, client node disconnected.")));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onKernalStop(boolean cancel) {
+         marshallerCtx.onMarshallerProcessorStop();
+ 
+         cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+         return MARSHALLER_PROC;
+     }
+ 
+     /**
+      * @param res Response.
+      */
+     private void cancelFutures(MappingExchangeResult res) {
+         for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+             fut.onDone(res);
+ 
+         for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+             fut.onDone(res);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 5fcd2eb,ea8f361..efd1926
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@@ -55,15 -44,12 +55,17 @@@ import org.apache.ignite.configuration.
  import org.apache.ignite.configuration.IgniteConfiguration;
  import org.apache.ignite.configuration.NearCacheConfiguration;
  import org.apache.ignite.configuration.TransactionConfiguration;
 -import org.apache.ignite.internal.binary.*;
 +import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 +import org.apache.ignite.internal.binary.BinaryRawWriterEx;
  import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
  import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
+ import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration;
 -import org.apache.ignite.platform.dotnet.*;
 +import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction;
 +import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration;
 +import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
 +import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
 +import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
+ import org.apache.ignite.plugin.CachePluginConfiguration;
  import org.apache.ignite.spi.communication.CommunicationSpi;
  import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
  import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean;
@@@ -821,19 -865,19 +850,19 @@@ public class PlatformConfigurationUtil
       * Write query entity.
       *
       * @param writer Writer.
--     * @param queryEntity Query entity.
++     * @param qryEntity Query entity.
       */
--    private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity queryEntity) {
--        assert queryEntity != null;
++    private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity) {
++        assert qryEntity != null;
  
--        writer.writeString(queryEntity.getKeyType());
--        writer.writeString(queryEntity.getValueType());
++        writer.writeString(qryEntity.getKeyType());
++        writer.writeString(qryEntity.getValueType());
  
          // Fields
--        LinkedHashMap<String, String> fields = queryEntity.getFields();
++        LinkedHashMap<String, String> fields = qryEntity.getFields();
  
          if (fields != null) {
--            Set<String> keyFields = queryEntity.getKeyFields();
++            Set<String> keyFields = qryEntity.getKeyFields();
  
              writer.writeInt(fields.size());
  
@@@ -847,7 -891,7 +876,7 @@@
              writer.writeInt(0);
  
          // Aliases
--        Map<String, String> aliases = queryEntity.getAliases();
++        Map<String, String> aliases = qryEntity.getAliases();
  
          if (aliases != null) {
              writer.writeInt(aliases.size());
@@@ -861,7 -905,7 +890,7 @@@
              writer.writeInt(0);
  
          // Indexes
--        Collection<QueryIndex> indexes = queryEntity.getIndexes();
++        Collection<QueryIndex> indexes = qryEntity.getIndexes();
  
          if (indexes != null) {
              writer.writeInt(indexes.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
index 772813f,fa33d3a..456bb86
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
@@@ -67,13 -65,13 +67,13 @@@ public class CachePluginManager extend
      public CachePluginManager(GridKernalContext ctx, CacheConfiguration cfg) {
          this.ctx = ctx;
          this.cfg = cfg;
 -        
 +
-         for (PluginProvider p : ctx.plugins().allProviders()) {
-             CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg);
+         if (cfg.getPluginConfigurations() != null) {
+             for (CachePluginConfiguration cachePluginCfg : cfg.getPluginConfigurations()) {
+                 CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg, cachePluginCfg);
  
-             CachePluginProvider provider = p.createCacheProvider(pluginCtx);
+                 CachePluginProvider provider = cachePluginCfg.createProvider(pluginCtx);
  
-             if (provider != null) {
                  providersList.add(provider);
                  providersMap.put(pluginCtx, provider);
              }
@@@ -99,9 -97,9 +99,9 @@@
      }
  
      /** {@inheritDoc} */
 -    @Override protected void stop0(boolean cancel) {
 +    @Override protected void stop0(boolean cancel, boolean destroy) {
-         for (ListIterator<CachePluginProvider> iter = providersList.listIterator(); iter.hasPrevious();)
-             iter.previous().stop(cancel);
+         for (int i = providersList.size() - 1; i >= 0; i--)
+             providersList.get(i).stop(cancel);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cf85a52,0ff6d8b..8b282ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -62,9 -62,9 +62,10 @@@ import org.apache.ignite.configuration.
  import org.apache.ignite.events.CacheQueryExecutedEvent;
  import org.apache.ignite.internal.GridKernalContext;
  import org.apache.ignite.internal.IgniteInternalFuture;
 +import org.apache.ignite.internal.NodeStoppingException;
  import org.apache.ignite.internal.binary.BinaryMarshaller;
  import org.apache.ignite.internal.binary.BinaryObjectEx;
+ import org.apache.ignite.internal.binary.BinaryObjectExImpl;
  import org.apache.ignite.internal.processors.GridProcessorAdapter;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@@ -742,83 -685,40 +742,70 @@@ public class GridQueryProcessor extend
          if (log.isDebugEnabled())
              log.debug("Store [space=" + space + ", key=" + key + ", val=" + val + "]");
  
--        CacheObjectContext coctx = null;
- 
-         if (ctx.indexing().enabled()) {
-             coctx = cacheObjectContext(space);
- 
-             Object key0 = unwrap(key, coctx);
- 
-             Object val0 = unwrap(val, coctx);
- 
-             ctx.indexing().store(space, key0, val0, expirationTime);
-         }
--
          if (idx == null)
              return;
  
          if (!busyLock.enterBusy())
 -            return;
 +            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
  
          try {
--            if (coctx == null)
--                coctx = cacheObjectContext(space);
++            CacheObjectContext coctx = cacheObjectContext(space);
  
 -            Class<?> valCls = null;
 +            TypeDescriptor desc = typeByValue(coctx, key, val, true);
  
 -            TypeId id;
 +            if (prevVal != null) {
 +                TypeDescriptor prevValDesc = typeByValue(coctx, key, prevVal, false);
  
 -            boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
 +                if (prevValDesc != null && prevValDesc != desc)
 +                    idx.remove(space, prevValDesc, key, partId, prevVal, prevVer);
 +            }
  
 -            if (binaryVal) {
 -                int typeId = ctx.cacheObjects().typeId(val);
 +            if (desc == null)
 +                return;
  
 -                id = new TypeId(space, typeId);
 -            }
 -            else {
 -                valCls = val.value(coctx, false).getClass();
 +            idx.store(space, desc, key, partId, val, ver, expirationTime, link);
 +        }
 +        finally {
 +            busyLock.leaveBusy();
 +        }
 +    }
  
 -                id = new TypeId(space, valCls);
 -            }
 +    /**
 +     * @param coctx Cache context.
 +     * @param key Key.
 +     * @param val Value.
 +     * @param checkType If {@code true} checks that key and value type correspond to found TypeDescriptor.
 +     * @return Type descriptor if found.
 +     * @throws IgniteCheckedException If type check failed.
 +     */
 +    @Nullable private TypeDescriptor typeByValue(CacheObjectContext coctx,
 +        KeyCacheObject key,
 +        CacheObject val,
 +        boolean checkType)
 +        throws IgniteCheckedException {
 +        Class<?> valCls = null;
  
 -            TypeDescriptor desc = types.get(id);
 +        TypeId id;
  
 -            if (desc == null || !desc.registered())
 -                return;
 +        boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
 +
 +        if (binaryVal) {
 +            int typeId = ctx.cacheObjects().typeId(val);
  
 +            id = new TypeId(coctx.cacheName(), typeId);
 +        }
 +        else {
 +            valCls = val.value(coctx, false).getClass();
 +
 +            id = new TypeId(coctx.cacheName(), valCls);
 +        }
 +
 +        TypeDescriptor desc = types.get(id);
 +
 +        if (desc == null || !desc.registered())
 +            return null;
 +
 +        if (checkType) {
              if (!binaryVal && !desc.valueClass().isAssignableFrom(valCls))
                  throw new IgniteCheckedException("Failed to update index due to class name conflict" +
                      "(multiple classes with same simple name are stored in the same cache) " +
@@@ -1143,33 -1039,14 +1124,21 @@@
          if (log.isDebugEnabled())
              log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]");
  
-         CacheObjectContext coctx = null;
- 
-         if (ctx.indexing().enabled()) {
-             coctx = cacheObjectContext(space);
- 
-             Object key0 = unwrap(key, coctx);
- 
-             ctx.indexing().remove(space, key0);
-         }
- 
-         // If val == null we only need to call SPI.
 -        if (idx == null)
 +        if (idx == null || val == null)
              return;
  
          if (!busyLock.enterBusy())
              throw new IllegalStateException("Failed to remove from index (grid is stopping).");
  
          try {
-             if (coctx == null)
-                 coctx = cacheObjectContext(space);
 -            idx.remove(space, key, val);
++            CacheObjectContext coctx = cacheObjectContext(space);
 +
 +            TypeDescriptor desc = typeByValue(coctx, key, val, false);
 +
 +            if (desc == null)
 +                return;
 +
 +            idx.remove(space, desc, key, partId, val, ver);
          }
          finally {
              busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 914c3a3,986fff7..096c531
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1666,26 -1651,24 +1671,26 @@@ public class GridServiceProcessor exten
                                  onReassignmentFailed(topVer, retries);
                          }
  
 +                        Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
 +
                          // Clean up zombie assignments.
 -                        for (Cache.Entry<Object, Object> e :
 -                            cache.entrySetx(CU.cachePrimary(ctx.grid().affinity(cache.name()), ctx.grid().localNode()))) {
 -                            if (!(e.getKey() instanceof GridServiceAssignmentsKey))
 -                                continue;
 +                        while (it.hasNext()) {
 +                            Cache.Entry<Object, Object> e = it.next();
  
-                             if (cache.context().affinity().primary(ctx.grid().localNode(), e.getKey(), topVer)) {
 -                            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
++                            if (cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), topVer)) {
 +                                String name = ((GridServiceAssignmentsKey)e.getKey()).name();
  
 -                            try {
 -                                if (cache.get(new GridServiceDeploymentKey(name)) == null) {
 -                                    if (log.isDebugEnabled())
 -                                        log.debug("Removed zombie assignments: " + e.getValue());
 +                                try {
 +                                    if (cache.get(new GridServiceDeploymentKey(name)) == null) {
 +                                        if (log.isDebugEnabled())
 +                                            log.debug("Removed zombie assignments: " + e.getValue());
  
 -                                    cache.getAndRemove(e.getKey());
 +                                        cache.getAndRemove(e.getKey());
 +                                    }
 +                                }
 +                                catch (IgniteCheckedException ex) {
 +                                    U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
                                  }
 -                            }
 -                            catch (IgniteCheckedException ex) {
 -                                U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
                              }
                          }
                      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 389d058,1cca9d3..d2e0714
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@@ -74,7 -74,7 +74,8 @@@ import java.nio.channels.SelectionKey
  import java.nio.channels.Selector;
  import java.nio.charset.Charset;
  import java.nio.file.Files;
 +import java.nio.file.Path;
+ import java.nio.file.Paths;
  import java.security.AccessController;
  import java.security.KeyManagementException;
  import java.security.MessageDigest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4b140be,4c89a7c..00ca409
mode 100644,100755..100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2c86322,7a0b713..1343151
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@@ -5869,14 -5869,15 +5869,14 @@@ public abstract class GridCacheAbstract
          @Override public void run(int idx) throws Exception {
              GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
  
 -            if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
 -                return;
 -
              int size = 0;
  
 +            if (ctx.isNear())
 +                ctx = ctx.near().dht().context();
 +
              for (String key : keys) {
-                 if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                 if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
 -                    GridCacheEntryEx e =
 -                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
 +                    GridCacheEntryEx e = ctx.cache().entryEx(key);
  
                      assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
                      assert !e.deleted() : "Entry is deleted: " + e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index be3933f,e76ab40..691100f
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@@ -411,9 -417,31 +410,28 @@@ public class GridCacheTestEntryEx exten
      }
  
      /** @inheritDoc */
-     @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+     @Override public void clearReserveForLoad(GridCacheVersion ver) {
+         assert false;
+     }
+ 
+     /** @inheritDoc */
+     @Override public EntryGetResult innerGetAndReserveForLoad(
 -        boolean readSwap,
+         boolean updateMetrics,
+         boolean evt,
+         UUID subjId,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc,
+         boolean keepBinary,
+         @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
+         assert false;
+ 
+         return null;
+     }
+ 
+     /** @inheritDoc */
+     @Nullable @Override public EntryGetResult innerGetVersioned(
          @Nullable GridCacheVersion ver,
          IgniteInternalTx tx,
 -        boolean readSwap,
 -        boolean unmarshal,
          boolean updateMetrics,
          boolean evt,
          UUID subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index b746883,e47a18d..a78d8ef
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@@ -5553,12 -5545,10 +5551,12 @@@ public class IgniteCacheConfigVariation
  
              int size = 0;
  
 +            if (ctx.isNear())
 +                ctx = ctx.near().dht().context();
 +
              for (String key : keys) {
-                 if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                 if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
 -                    GridCacheEntryEx e =
 -                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
 +                    GridCacheEntryEx e = ctx.cache().entryEx(key);
  
                      assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
                      assert !e.deleted() : "Entry is deleted: " + e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------