You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/01 09:25:56 UTC

[13/47] ignite git commit: IGNITE-5729 - IgniteCacheProxy instances from with() methods are not reusable after cache restart

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
new file mode 100644
index 0000000..b94afa1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -0,0 +1,1810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Configuration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCacheRestartingException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheManager;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.QueryDetailMetrics;
+import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.AsyncSupportAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.CacheQuery;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridEmptyIterator;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CX1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache proxy implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>>
+    implements IgniteCacheProxy<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Context. */
+    private volatile GridCacheContext<K, V> ctx;
+
+    /** Delegate. */
+    @GridToStringInclude
+    private volatile IgniteInternalCache<K, V> delegate;
+
+    /** */
+    @GridToStringExclude
+    private CacheManager cacheMgr;
+
+    /** Future indicates that cache is under restarting. */
+    private final AtomicReference<GridFutureAdapter<Void>> restartFut;
+
+    /** Flag indicates that proxy is closed. */
+    private volatile boolean closed;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgniteCacheProxyImpl() {
+        restartFut = new AtomicReference<GridFutureAdapter<Void>>(null);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param delegate Delegate.
+     * @param async Async support flag.
+     */
+    public IgniteCacheProxyImpl(
+            @NotNull GridCacheContext<K, V> ctx,
+            @NotNull IgniteInternalCache<K, V> delegate,
+            boolean async
+    ) {
+        this(ctx, delegate, new AtomicReference<GridFutureAdapter<Void>>(null), async);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param delegate Delegate.
+     * @param async Async support flag.
+     */
+    private IgniteCacheProxyImpl(
+        @NotNull GridCacheContext<K, V> ctx,
+        @NotNull IgniteInternalCache<K, V> delegate,
+        @NotNull AtomicReference<GridFutureAdapter<Void>> restartFut,
+        boolean async
+    ) {
+        super(async);
+
+        assert ctx != null;
+        assert delegate != null;
+
+        this.ctx = ctx;
+        this.delegate = delegate;
+
+        this.restartFut = restartFut;
+    }
+
+    /**
+     * @return Context.
+     */
+    @Override
+    public GridCacheContext<K, V> context() {
+        return ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCacheProxy<K, V> cacheNoGate() {
+        return new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheMetrics metrics() {
+        return ctx.cache().clusterMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheMetrics metrics(ClusterGroup grp) {
+        return ctx.cache().clusterMetrics(grp);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheMetrics localMetrics() {
+        return ctx.cache().localMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheMetricsMXBean mxBean() {
+        return ctx.cache().clusterMxBean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheMetricsMXBean localMxBean() {
+        return ctx.cache().localMxBean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+        CacheConfiguration cfg = ctx.config();
+
+        if (!clazz.isAssignableFrom(cfg.getClass()))
+            throw new IllegalArgumentException();
+
+        return clazz.cast(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withSkipStore() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K1, V1> IgniteCache<K1, V1> withKeepBinary() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withNoRetries() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withPartitionRecover() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+        try {
+            if (isAsync()) {
+                if (ctx.cache().isLocal())
+                    setFuture(ctx.cache().localLoadCacheAsync(p, args));
+                else
+                    setFuture(ctx.cache().globalLoadCacheAsync(p, args));
+            }
+            else {
+                if (ctx.cache().isLocal())
+                    ctx.cache().localLoadCache(p, args);
+                else
+                    ctx.cache().globalLoadCache(p, args);
+            }
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
+        @Nullable Object... args) throws CacheException {
+        try {
+            if (ctx.cache().isLocal())
+                return (IgniteFuture<Void>)createFuture(ctx.cache().localLoadCacheAsync(p, args));
+            else
+                return (IgniteFuture<Void>)createFuture(ctx.cache().globalLoadCacheAsync(p, args));
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+   }
+
+    /** {@inheritDoc} */
+    @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+        try {
+            if (isAsync())
+                setFuture(delegate.localLoadCacheAsync(p, args));
+            else
+                delegate.localLoadCache(p, args);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
+        @Nullable Object... args) throws CacheException {
+        return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args));
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAndPutIfAbsentAsync(key, val));
+
+                return null;
+            }
+            else
+                return delegate.getAndPutIfAbsent(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException {
+        return createFuture(delegate.getAndPutIfAbsentAsync(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Lock lock(K key) throws CacheException {
+        return lockAll(Collections.singleton(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Lock lockAll(final Collection<? extends K> keys) {
+        return new CacheLockImpl<>(ctx.gate(), delegate, new CacheOperationContext(), keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+        return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
+    }
+
+    /**
+     * @param scanQry ScanQry.
+     * @param transformer Transformer
+     * @param grp Optional cluster group.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private <T, R> QueryCursor<R> query(
+        final ScanQuery scanQry,
+        @Nullable final IgniteClosure<T, R> transformer,
+        @Nullable ClusterGroup grp)
+        throws IgniteCheckedException {
+
+        final CacheQuery<R> qry;
+
+        CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+        boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+        IgniteBiPredicate<K, V> p = scanQry.getFilter();
+
+        qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
+
+        if (scanQry.getPageSize() > 0)
+            qry.pageSize(scanQry.getPageSize());
+
+        if (grp != null)
+            qry.projection(grp);
+
+        final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
+            ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
+                @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
+                    final GridCloseableIterator iter0 = qry.executeScanQuery();
+
+                    final boolean needToConvert = transformer == null;
+
+                    return new GridCloseableIteratorAdapter<R>() {
+                        @Override protected R onNext() throws IgniteCheckedException {
+                            Object next = iter0.nextX();
+
+                            if (needToConvert) {
+                                Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
+
+                                return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue());
+                            }
+
+                            return (R)next;
+                        }
+
+                        @Override protected boolean onHasNext() throws IgniteCheckedException {
+                            return iter0.hasNextX();
+                        }
+
+                        @Override protected void onClose() throws IgniteCheckedException {
+                            iter0.close();
+                        }
+                    };
+                }
+            }, true);
+
+        return new QueryCursorImpl<>(iter);
+    }
+
+    /**
+     * @param filter Filter.
+     * @param grp Optional cluster group.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+        throws IgniteCheckedException {
+        final CacheQuery qry;
+
+        CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+        boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+        final CacheQueryFuture fut;
+
+        if (filter instanceof TextQuery) {
+            TextQuery p = (TextQuery)filter;
+
+            qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary);
+
+            if (grp != null)
+                qry.projection(grp);
+
+            fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx,
+                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() {
+                        return qry.execute();
+                    }
+                }, false);
+        }
+        else if (filter instanceof SpiQuery) {
+            qry = ctx.queries().createSpiQuery(isKeepBinary);
+
+            if (grp != null)
+                qry.projection(grp);
+
+            fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, filter.getClass().getSimpleName(),
+                ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() {
+                        return qry.execute(((SpiQuery)filter).getArgs());
+                    }
+                }, false);
+        }
+        else {
+            if (filter instanceof SqlFieldsQuery)
+                throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " +
+                    SqlFieldsQuery.class.getSimpleName() + ".");
+
+            throw new CacheException("Unsupported query type: " + filter);
+        }
+
+        return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K, V>>() {
+            /** */
+            private Cache.Entry<K, V> cur;
+
+            @Override protected Entry<K, V> onNext() throws IgniteCheckedException {
+                if (!onHasNext())
+                    throw new NoSuchElementException();
+
+                Cache.Entry<K, V> e = cur;
+
+                cur = null;
+
+                return e;
+            }
+
+            @Override protected boolean onHasNext() throws IgniteCheckedException {
+                if (cur != null)
+                    return true;
+
+                Object next = fut.next();
+
+                // Workaround a bug: if IndexingSpi is configured future represents Iterator<Cache.Entry>
+                // instead of Iterator<Map.Entry> due to IndexingSpi interface.
+                if (next == null)
+                    return false;
+
+                if (next instanceof Cache.Entry)
+                    cur = (Cache.Entry)next;
+                else {
+                    Map.Entry e = (Map.Entry)next;
+
+                    cur = new CacheEntryImpl(e.getKey(), e.getValue());
+                }
+
+                return true;
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                fut.cancel();
+            }
+        });
+    }
+
+    /**
+     * @param loc Enforce local.
+     * @return Local node cluster group.
+     */
+    private ClusterGroup projection(boolean loc) {
+        if (loc || ctx.isLocal() || ctx.isReplicatedAffinityNode())
+            return ctx.kernalContext().grid().cluster().forLocal();
+
+        if (ctx.isReplicated())
+            return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom();
+
+        return null;
+    }
+
+    /**
+     * Executes continuous query.
+     *
+     * @param qry Query.
+     * @param loc Local flag.
+     * @param keepBinary Keep binary flag.
+     * @return Initial iteration cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) {
+        if (qry.getInitialQuery() instanceof ContinuousQuery)
+            throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
+                "continuous query. Use SCAN or SQL query for initial iteration.");
+
+        if (qry.getLocalListener() == null)
+            throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+
+        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+            throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
+        try {
+            final UUID routineId = ctx.continuousQueries().executeQuery(
+                qry.getLocalListener(),
+                qry.getRemoteFilter(),
+                qry.getRemoteFilterFactory(),
+                qry.getPageSize(),
+                qry.getTimeInterval(),
+                qry.isAutoUnsubscribe(),
+                loc,
+                keepBinary,
+                qry.isIncludeExpired());
+
+            final QueryCursor<Cache.Entry<K, V>> cur =
+                qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
+
+            return new QueryCursor<Cache.Entry<K, V>>() {
+                @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                    return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>();
+                }
+
+                @Override public List<Cache.Entry<K, V>> getAll() {
+                    return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList();
+                }
+
+                @Override public void close() {
+                    if (cur != null)
+                        cur.close();
+
+                    try {
+                        ctx.kernalContext().continuous().stopRoutine(routineId).get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+            };
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
+        return (FieldsQueryCursor<List<?>>)query((Query)qry);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <R> QueryCursor<R> query(Query<R> qry) {
+        A.notNull(qry, "qry");
+        try {
+            ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+            validate(qry);
+
+            convertToBinary(qry);
+
+            CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+            boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+            if (qry instanceof ContinuousQuery)
+                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), keepBinary);
+
+            if (qry instanceof SqlQuery)
+                return (QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, keepBinary);
+
+            if (qry instanceof SqlFieldsQuery)
+                return (FieldsQueryCursor<R>)ctx.kernalContext().query().querySqlFields(ctx, (SqlFieldsQuery)qry,
+                    keepBinary);
+
+            if (qry instanceof ScanQuery)
+                return query((ScanQuery)qry, null, projection(qry.isLocal()));
+
+            return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
+        }
+        catch (Exception e) {
+            if (e instanceof CacheException)
+                throw (CacheException)e;
+
+            throw new CacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        A.notNull(qry, "qry");
+        A.notNull(transformer, "transformer");
+
+        if (!(qry instanceof ScanQuery))
+            throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+        try {
+            ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+            validate(qry);
+
+            return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
+        }
+        catch (Exception e) {
+            if (e instanceof CacheException)
+                throw (CacheException)e;
+
+            throw new CacheException(e);
+        }
+    }
+
+    /**
+     * Convert query arguments to BinaryObjects if binary marshaller used.
+     *
+     * @param qry Query.
+     */
+    private void convertToBinary(final Query qry) {
+        if (ctx.binaryMarshaller()) {
+            if (qry instanceof SqlQuery) {
+                final SqlQuery sqlQry = (SqlQuery) qry;
+
+                convertToBinary(sqlQry.getArgs());
+            }
+            else if (qry instanceof SpiQuery) {
+                final SpiQuery spiQry = (SpiQuery) qry;
+
+                convertToBinary(spiQry.getArgs());
+            }
+            else if (qry instanceof SqlFieldsQuery) {
+                final SqlFieldsQuery fieldsQry = (SqlFieldsQuery) qry;
+
+                convertToBinary(fieldsQry.getArgs());
+            }
+        }
+    }
+
+    /**
+     * Converts query arguments to BinaryObjects if binary marshaller used.
+     *
+     * @param args Arguments.
+     */
+    private void convertToBinary(final Object[] args) {
+        if (args == null)
+            return;
+
+        for (int i = 0; i < args.length; i++)
+            args[i] = ctx.cacheObjects().binary().toBinary(args[i]);
+    }
+
+    /**
+     * Checks query.
+     *
+     * @param qry Query
+     * @throws CacheException If query indexing disabled for sql query.
+     */
+    private void validate(Query qry) {
+        if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
+            !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery))
+            throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
+                ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
+
+        if (!ctx.kernalContext().query().moduleEnabled() &&
+            (qry instanceof SqlQuery || qry instanceof SqlFieldsQuery || qry instanceof TextQuery))
+            throw new CacheException("Failed to execute query. Add module 'ignite-indexing' to the classpath " +
+                "of all Ignite nodes.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+        try {
+            return delegate.localEntries(peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueryMetrics queryMetrics() {
+        return delegate.context().queries().metrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetQueryMetrics() {
+        delegate.context().queries().resetMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() {
+        return delegate.context().queries().detailMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetQueryDetailMetrics() {
+        delegate.context().queries().resetDetailMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localEvict(Collection<? extends K> keys) {
+        delegate.evictAll(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
+        try {
+            return delegate.localPeek(key, peekModes, null);
+        }
+        catch (IgniteException | IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size(CachePeekMode... peekModes) throws CacheException {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.sizeAsync(peekModes));
+
+                return 0;
+            }
+            else
+                return delegate.size(peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException {
+        return createFuture(delegate.sizeAsync(peekModes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.sizeLongAsync(peekModes));
+
+                return 0;
+            }
+            else
+                return delegate.sizeLong(peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException {
+        return createFuture(delegate.sizeLongAsync(peekModes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.sizeLongAsync(part, peekModes));
+
+                return 0;
+            }
+            else
+                return delegate.sizeLong(part, peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException {
+        return createFuture(delegate.sizeLongAsync(part, peekModes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int localSize(CachePeekMode... peekModes) {
+        try {
+            return delegate.localSize(peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long localSizeLong(CachePeekMode... peekModes) {
+        try {
+            return delegate.localSizeLong(peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long localSizeLong(int part, CachePeekMode... peekModes) {
+        try {
+            return delegate.localSizeLong(part, peekModes);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public V get(K key) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAsync(key));
+
+                return null;
+            }
+            else
+                return delegate.get(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<V> getAsync(K key) {
+        return createFuture(delegate.getAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntry<K, V> getEntry(K key) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getEntryAsync(key));
+
+                return null;
+            }
+            else
+                return delegate.getEntry(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) {
+        return createFuture(delegate.getEntryAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAllAsync(keys));
+
+                return null;
+            }
+            else
+                return delegate.getAll(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) {
+        return createFuture(delegate.getAllAsync(keys));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getEntriesAsync(keys));
+
+                return null;
+            }
+            else
+                return delegate.getEntries(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) {
+        return createFuture(delegate.getEntriesAsync(keys));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAllOutTxAsync(keys));
+
+                return null;
+            }
+            else
+                return delegate.getAllOutTx(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+        return createFuture(delegate.getAllOutTxAsync(keys));
+    }
+
+    /**
+     * @param keys Keys.
+     * @return Values map.
+     */
+    public Map<K, V> getAll(Collection<? extends K> keys) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAllAsync(keys));
+
+                return null;
+            }
+            else
+                return delegate.getAll(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKey(K key) {
+        if (isAsync()) {
+            setFuture(delegate.containsKeyAsync(key));
+
+            return false;
+        }
+        else
+            return delegate.containsKey(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+        return createFuture(delegate.containsKeyAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKeys(Set<? extends K> keys) {
+        if (isAsync()) {
+            setFuture(delegate.containsKeysAsync(keys));
+
+            return false;
+        }
+        else
+            return delegate.containsKeys(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) {
+        return createFuture(delegate.containsKeysAsync(keys));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadAll(
+        Set<? extends K> keys,
+        boolean replaceExisting,
+        @Nullable final CompletionListener completionLsnr
+    ) {
+        IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
+
+        if (completionLsnr != null) {
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> fut) {
+                    try {
+                        fut.get();
+
+                        completionLsnr.onCompletion();
+                    }
+                    catch (IgniteCheckedException e) {
+                        completionLsnr.onException(cacheException(e));
+                    }
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void put(K key, V val) {
+        try {
+            if (isAsync())
+                setFuture(putAsync0(key, val));
+            else
+                delegate.put(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> putAsync(K key, V val) {
+        return createFuture(putAsync0(key, val));
+    }
+
+    /**
+     * Put async internal operation implementation.
+     *
+     * @param key Key.
+     * @param val Value.
+     * @return Internal future.
+     */
+    private IgniteInternalFuture<Void> putAsync0(K key, V val) {
+        IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val);
+
+        return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() {
+            @Override public Void applyx(IgniteInternalFuture<Boolean> fut1) throws IgniteCheckedException {
+                try {
+                    fut1.get();
+                }
+                catch (RuntimeException e) {
+                    throw new GridClosureException(e);
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndPut(K key, V val) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAndPutAsync(key, val));
+
+                return null;
+            }
+            else
+                return delegate.getAndPut(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<V> getAndPutAsync(K key, V val) {
+        return createFuture(delegate.getAndPutAsync(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAll(Map<? extends K, ? extends V> map) {
+        try {
+            if (isAsync())
+                setFuture(delegate.putAllAsync(map));
+            else
+                delegate.putAll(map);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) {
+        return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean putIfAbsent(K key, V val) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.putIfAbsentAsync(key, val));
+
+                return false;
+            }
+            else
+                return delegate.putIfAbsent(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) {
+        return createFuture(delegate.putIfAbsentAsync(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean remove(K key) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.removeAsync(key));
+
+                return false;
+            }
+            else
+                return delegate.remove(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> removeAsync(K key) {
+        return createFuture(delegate.removeAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean remove(K key, V oldVal) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.removeAsync(key, oldVal));
+
+                return false;
+            }
+            else
+                return delegate.remove(key, oldVal);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) {
+        return createFuture(delegate.removeAsync(key, oldVal));
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndRemove(K key) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAndRemoveAsync(key));
+
+                return null;
+            }
+            else
+                return delegate.getAndRemove(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<V> getAndRemoveAsync(K key) {
+        return createFuture(delegate.getAndRemoveAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean replace(K key, V oldVal, V newVal) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.replaceAsync(key, oldVal, newVal));
+
+                return false;
+            }
+            else
+                return delegate.replace(key, oldVal, newVal);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+        return createFuture(delegate.replaceAsync(key, oldVal, newVal));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean replace(K key, V val) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.replaceAsync(key, val));
+
+                return false;
+            }
+            else
+                return delegate.replace(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) {
+        return createFuture(delegate.replaceAsync(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndReplace(K key, V val) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.getAndReplaceAsync(key, val));
+
+                return null;
+            }
+            else
+                return delegate.getAndReplace(key, val);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) {
+        return createFuture(delegate.getAndReplaceAsync(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll(Set<? extends K> keys) {
+        try {
+            if (isAsync())
+                setFuture(delegate.removeAllAsync(keys));
+            else
+                delegate.removeAll(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) {
+        return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll() {
+        try {
+            if (isAsync())
+                setFuture(delegate.removeAllAsync());
+            else
+                delegate.removeAll();
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> removeAllAsync() {
+        return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear(K key) {
+        try {
+            if (isAsync())
+                setFuture(delegate.clearAsync(key));
+            else
+                delegate.clear(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> clearAsync(K key) {
+        return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clearAll(Set<? extends K> keys) {
+        try {
+            if (isAsync())
+                setFuture(delegate.clearAllAsync(keys));
+            else
+                delegate.clearAll(keys);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) {
+        return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() {
+        try {
+            if (isAsync())
+                setFuture(delegate.clearAsync());
+            else
+                delegate.clear();
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> clearAsync() {
+        return (IgniteFuture<Void>)createFuture(delegate.clearAsync());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localClear(K key) {
+        delegate.clearLocally(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localClearAll(Set<? extends K> keys) {
+        for (K key : keys)
+            delegate.clearLocally(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+        throws EntryProcessorException {
+        try {
+            if (isAsync()) {
+                setFuture(invokeAsync0(key, entryProcessor, args));
+
+                return null;
+            }
+            else {
+                EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args);
+
+                return res != null ? res.get() : null;
+            }
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        return createFuture(invokeAsync0(key, entryProcessor, args));
+    }
+
+    /**
+     * Invoke async operation internal implementation.
+     *
+     * @param key Key.
+     * @param entryProcessor Processor.
+     * @param args Arguments.
+     * @return Internal future.
+     */
+    private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) {
+        IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args);
+
+        return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
+            @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut1)
+                throws IgniteCheckedException {
+                try {
+                    EntryProcessorResult<T> res = fut1.get();
+
+                    return res != null ? res.get() : null;
+                }
+                catch (RuntimeException e) {
+                    throw new GridClosureException(e);
+                }
+            }
+        });
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args)
+        throws EntryProcessorException {
+        return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        return invokeAsync(key, (EntryProcessor<K, V, T>)entryProcessor, args);
+    }
+
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Invoke result.
+     */
+    public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            if (isAsync())
+                throw new UnsupportedOperationException();
+            else {
+                EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args);
+
+                return res != null ? res.get() : null;
+            }
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+
+                return null;
+            }
+            else
+                return delegate.invokeAll(keys, entryProcessor, args);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor, Object... args) {
+        return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+        CacheEntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+
+                return null;
+            }
+            else
+                return delegate.invokeAll(keys, entryProcessor, args);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+        CacheEntryProcessor<K, V, T> entryProcessor, Object... args) {
+        return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+        Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+        Object... args) {
+        try {
+            if (isAsync()) {
+                setFuture(delegate.invokeAllAsync(map, args));
+
+                return null;
+            }
+            else
+                return delegate.invokeAll(map, args);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+        Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) {
+        return createFuture(delegate.invokeAllAsync(map, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return delegate.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheManager getCacheManager() {
+        return cacheMgr;
+    }
+
+    /**
+     * @param cacheMgr Cache manager.
+     */
+    public void setCacheManager(CacheManager cacheMgr) {
+        this.cacheMgr = cacheMgr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void destroy() {
+        destroyAsync().get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> destroyAsync() {
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), false, true, false));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        closeAsync().get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> closeAsync() {
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(ctx.name()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClosed() {
+        return ctx.kernalContext().cache().context().closed(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> clazz) {
+        if (clazz.isAssignableFrom(getClass()))
+            return (T)this;
+        else if (clazz.isAssignableFrom(IgniteEx.class))
+            return (T)ctx.grid();
+
+        throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+        try {
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null && opCtx.isKeepBinary());
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+        try {
+            ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<Cache.Entry<K, V>> iterator() {
+        try {
+            return ctx.cache().igniteIterator();
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<K, V> createAsyncInstance() {
+        return new IgniteCacheProxyImpl<K, V>(
+                ctx,
+                delegate,
+                true
+        );
+    }
+
+    /**
+     * Creates projection that will operate with binary objects. <p> Projection returned by this method will force
+     * cache not to deserialize binary objects, so keys and values will be returned from cache API methods without
+     * changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code BinaryObject}
+     * for binary classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
+     * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li>
+     * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link
+     * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps,
+     * collections and array of objects (but objects inside them will still be converted if they are binary) </li>
+     * </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be
+     * stored in binary format), you should acquire following projection to avoid deserialization:
+     * <pre>
+     * IgniteInternalCache<Integer, GridBinaryObject> prj = cache.keepBinary();
+     *
+     * // Value is not deserialized and returned in binary format.
+     * GridBinaryObject po = prj.get(1);
+     * </pre>
+     * <p> Note that this method makes sense only if cache is working in binary mode ({@code
+     * CacheConfiguration#isBinaryEnabled()} returns {@code true}. If not, this method is no-op and will return
+     * current projection.
+     *
+     * @return Projection for binary objects.
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K1, V1> IgniteCache<K1, V1> keepBinary() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param dataCenterId Data center ID.
+     * @return Projection for data center id.
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @return Cache with skip store enabled.
+     */
+    @Override
+    public IgniteCache<K, V> skipStore() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Method converts exception to IgniteCacheRestartingException in case of cache restarting
+     * or to CacheException in other cases.
+     *
+     * @param e {@code IgniteCheckedException} or {@code IgniteException}.
+     * @return Cache exception.
+     */
+    private RuntimeException cacheException(Exception e) {
+        GridFutureAdapter<Void> restartFut = this.restartFut.get();
+
+        if (restartFut != null && !restartFut.isDone()) {
+            if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class))
+                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " +
+                        ctx.name());
+        }
+
+        if (e instanceof IgniteCheckedException)
+            return CU.convertToCacheException((IgniteCheckedException) e);
+
+        if (e instanceof RuntimeException)
+            return (RuntimeException) e;
+
+        throw new IllegalStateException("Unknown exception", e);
+    }
+
+    /**
+     * @param fut Future for async operation.
+     */
+    private <R> void setFuture(IgniteInternalFuture<R> fut) {
+        curFut.set(createFuture(fut));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
+        return new IgniteCacheFutureImpl<>(fut);
+    }
+
+    /**
+     * @return Internal proxy.
+     */
+    @Override
+    public GridCacheProxyImpl<K, V> internalProxy() {
+        return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall());
+    }
+
+    /**
+     * @return {@code True} if proxy was closed.
+     */
+    @Override public boolean isProxyClosed() {
+        return closed;
+    }
+
+    /**
+     * Closes this proxy instance.
+     */
+    @Override public void closeProxy() {
+        closed = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> lostPartitions() {
+        return delegate.lostPartitions();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx);
+
+        out.writeObject(delegate);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ctx = (GridCacheContext<K, V>)in.readObject();
+
+        delegate = (IgniteInternalCache<K, V>)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> rebalance() {
+        return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> indexReadyFuture() {
+        IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId());
+
+        if (fut == null)
+            return new IgniteFinishedFutureImpl<>();
+
+        return new IgniteFutureImpl<>(fut);
+    }
+
+    /**
+     * Gets value without waiting for toplogy changes.
+     *
+     * @param key Key.
+     * @return Value.
+     */
+    @Override
+    public V getTopologySafe(K key) {
+        try {
+            return delegate.getTopologySafe(key);
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            throw cacheException(e);
+        }
+    }
+
+    /**
+     * Throws {@code IgniteCacheRestartingException} if proxy is restarting.
+     */
+    public void checkRestart() {
+        if (isRestarting())
+            throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut.get()), "Cache is restarting: " +
+                    context().name());
+    }
+
+    /**
+     * @return True if proxy is restarting, false in other case.
+     */
+    public boolean isRestarting() {
+        return restartFut != null && restartFut.get() != null;
+    }
+
+    /**
+     * Restarts this cache proxy.
+     */
+    public void restart() {
+        GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
+
+        final GridFutureAdapter<Void> currentFut = this.restartFut.get();
+
+        boolean changed = this.restartFut.compareAndSet(currentFut, restartFut);
+
+        if (changed && currentFut != null)
+            restartFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+                @Override public void apply(IgniteInternalFuture<Void> future) {
+                    if (future.error() != null)
+                        currentFut.onDone(future.error());
+                    else
+                        currentFut.onDone();
+                }
+            });
+    }
+
+    /**
+     * Mark this proxy as restarted.
+     *
+     * @param ctx New cache context.
+     * @param delegate New delegate.
+     */
+    public void onRestarted(GridCacheContext ctx, IgniteInternalCache delegate) {
+        GridFutureAdapter<Void> restartFut = this.restartFut.get();
+
+        assert restartFut != null;
+
+        this.ctx = ctx;
+        this.delegate = delegate;
+
+        restartFut.onDone();
+
+        this.restartFut.compareAndSet(restartFut, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteCacheProxyImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index 0cec1fe..61ab122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -62,7 +62,7 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
             IgniteLogger log = ctx.log(IgniteDrDataStreamerCacheUpdater.class);
             GridCacheAdapter internalCache = ctx.cache().internalCache(cacheName);
 
-            CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).operationContext();
+            CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).context().operationContextPerCall();
 
             IgniteInternalCache cache =
                 opCtx != null ? new GridCacheProxyImpl(internalCache.context(), internalCache, opCtx) : internalCache;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 6207995..ef914a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -17,6 +17,18 @@
 
 package org.apache.ignite.internal.processors.platform.cache;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -41,8 +53,8 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformNativeException;
-import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
@@ -63,19 +75,6 @@ import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
 
-import javax.cache.Cache;
-import javax.cache.integration.CompletionListener;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-
 /**
  * Native cache wrapper implementation.
  */
@@ -1009,7 +1008,7 @@ public class PlatformCache extends PlatformAbstractTarget {
             }
 
             case OP_WITH_NO_RETRIES: {
-                CacheOperationContext opCtx = cache.operationContext();
+                CacheOperationContext opCtx = cache.context().operationContextPerCall();
 
                 if (opCtx != null && opCtx.noRetries())
                     return this;
@@ -1018,7 +1017,9 @@ public class PlatformCache extends PlatformAbstractTarget {
             }
 
             case OP_WITH_SKIP_STORE: {
-                if (cache.delegate().skipStore())
+                CacheOperationContext opCtx = cache.context().operationContextPerCall();
+
+                if (opCtx != null && opCtx.skipStore())
                     return this;
 
                 return copy(rawCache.withSkipStore(), keepBinary);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
index 7affa8c..7005e14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -149,7 +150,15 @@ public class CacheEntryProcessorCopySelfTest extends GridCommonAbstractTest {
                 }
             });
 
-            CacheObject obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(0).peekVisibleValue();
+            GridCacheAdapter ca = (GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate();
+
+            GridCacheEntryEx entry = ca.entryEx(0);
+
+            entry.unswap();
+
+            CacheObject obj = entry.peekVisibleValue();
+
+            ca.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
 
             int actCnt = cnt.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
index f67e247..c53bc4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
@@ -768,6 +768,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
 
         cache.close();
 
+        // Check second close succeeds without exception.
         cache.close();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 822537c..9376971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -246,6 +247,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
             }
         }
 
+        cfg.setAffinity(new RendezvousAffinityFunction(false, 4096));
         cfg.setCacheMode(cacheMode());
         cfg.setAtomicityMode(atomicityMode());
         cfg.setWriteSynchronizationMode(writeSynchronization());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
index 80404ce..2a90bf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
@@ -178,7 +178,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
                 cache.put(key, val);
 
                 CacheObject obj =
-                    ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+                    ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
 
                 // Check thar internal entry wasn't changed.
                 assertEquals(i, getValue(obj, cache));
@@ -211,7 +211,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
 
                 cache.put(key, newTestVal);
 
-                obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+                obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
 
                 // Check thar internal entry wasn't changed.
                 assertEquals(-i, getValue(obj, cache));
@@ -290,7 +290,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
                 });
 
                 CacheObject obj =
-                    ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+                    ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
 
                 assertNotEquals(WRONG_VALUE, getValue(obj, cache));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index 3c5fe0e..e068252 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -124,7 +124,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
                     info("Node is reported as NOT affinity node for key [key=" + key +
                         ", nodeId=" + locNode.id() + ']');
 
-                    if (nearEnabled() && cache == cache0)
+                    if (nearEnabled() && cache.equals(cache0))
                         assertEquals((Integer)i, cache0.localPeek(key));
                     else
                         assertNull(cache0.localPeek(key));
@@ -184,7 +184,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
                     info("Node is reported as NOT affinity node for key [key=" + key +
                         ", nodeId=" + locNode.id() + ']');
 
-                    if (nearEnabled() && cache == cache0)
+                    if (nearEnabled() && cache.equals(cache0))
                         assertEquals((Integer)i, cache0.localPeek(key));
                     else
                         assertNull(cache0.localPeek(key));

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index dcba92f..8d5462d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,12 +47,13 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import junit.framework.AssertionFailedError;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheEntry;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -83,7 +82,6 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
 import org.apache.ignite.transactions.Transaction;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index bcf46fd..e473d52 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -65,7 +65,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index d3269c3..b55e3d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,6 +49,7 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
+import com.google.common.collect.Sets;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -78,8 +78,8 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -2892,10 +2893,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() {
             @Override public void run() {
-                try {
-                    int cntr = 0;
+                int cntr = 0;
 
-                    while (!stop.get()) {
+                while (!stop.get()) {
+                    try {
                         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                         String grp;
@@ -2927,13 +2928,20 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
                         node.destroyCache(cache.getName());
                     }
-                }
-                catch (Exception e) {
-                    err.set(true);
+                    catch (Exception e) {
+                        if (X.hasCause(e, CacheStoppedException.class)) {
+                            // Cache operation can be blocked on
+                            // awaiting new topology version and cancelled with CacheStoppedException cause.
 
-                    log.error("Unexpected error(2): " + e, e);
+                            continue;
+                        }
 
-                    stop.set(true);
+                        err.set(true);
+
+                        log.error("Unexpected error(2): " + e, e);
+
+                        stop.set(true);
+                    }
                 }
             }
         }, "cache-destroy-thread");
@@ -3706,7 +3714,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
         final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES);
 
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < CACHES; i++) {
             CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL;
 
             caches.set(i,
@@ -3799,28 +3807,41 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
                 IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
                     @Override public void run() {
-                        try {
-                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                            while (!stop.get()) {
+                        while (!stop.get()) {
+                            try {
                                 int idx = rnd.nextInt(CACHES);
 
                                 IgniteCache cache = caches.get(idx);
 
                                 if (cache != null && caches.compareAndSet(idx, cache, null)) {
-                                    for (int i = 0; i < 10; i++)
-                                        cacheOperation(rnd, cache);
-
-                                    caches.set(idx, cache);
+                                    try {
+                                        for (int i = 0; i < 10; i++)
+                                            cacheOperation(rnd, cache);
+                                    }
+                                    catch (Exception e) {
+                                        if (X.hasCause(e, CacheStoppedException.class)) {
+                                            // Cache operation can be blocked on
+                                            // awaiting new topology version and cancelled with CacheStoppedException cause.
+
+                                            continue;
+                                        }
+
+                                        throw e;
+                                    }
+                                    finally {
+                                        caches.set(idx, cache);
+                                    }
                                 }
                             }
-                        }
-                        catch (Exception e) {
-                            err.set(e);
+                            catch (Exception e) {
+                                err.set(e);
 
-                            log.error("Unexpected error: " + e, e);
+                                log.error("Unexpected error: " + e, e);
 
-                            stop.set(true);
+                                stop.set(true);
+                            }
                         }
                     }
                 }, 8, "op-thread");

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
index 7cb9861..25b90c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
@@ -113,7 +113,6 @@ public class IgniteCacheStartStopLoadTest extends GridCommonAbstractTest {
 
                 cache.put(1, obj);
 
-                weakMap.put(((IgniteCacheProxy)cache).delegate(), Boolean.TRUE);
                 weakMap.put(obj, Boolean.TRUE);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
index 9a49b6c..6493f88 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.MemoryPolicyConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -217,7 +218,7 @@ public class MemoryPolicyInitializationTest extends GridCommonAbstractTest {
      * @param plcName Policy name.
      */
     private void verifyCacheMemoryPolicy(IgniteCache cache, String plcName) {
-        GridCacheContext ctx = U.field(cache, "ctx");
+        GridCacheContext ctx = ((IgniteCacheProxy) cache).context();
 
         assertEquals(plcName, ctx.memoryPolicy().config().getName());
     }