You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/05/27 18:39:27 UTC

[1/2] ignite git commit: IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019.

Repository: ignite
Updated Branches:
  refs/heads/master 858e5b729 -> aad3b0c53


http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a31263f..8d9d953 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.CacheException;
@@ -100,7 +99,6 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -132,7 +130,7 @@ public class GridReduceQueryExecutor {
     private final AtomicLong qryIdGen;
 
     /** */
-    private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
 
     /** */
     private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
@@ -191,8 +189,8 @@ public class GridReduceQueryExecutor {
             @Override public void onEvent(final Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                for (QueryRun r : runs.values()) {
-                    for (GridMergeIndex idx : r.idxs) {
+                for (ReduceQueryRun r : runs.values()) {
+                    for (GridMergeIndex idx : r.indexes()) {
                         if (idx.hasSource(nodeId)) {
                             handleNodeLeft(r, nodeId);
 
@@ -208,7 +206,7 @@ public class GridReduceQueryExecutor {
      * @param r Query run.
      * @param nodeId Left node ID.
      */
-    private void handleNodeLeft(QueryRun r, UUID nodeId) {
+    private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) {
         // Will attempt to retry. If reduce query was started it will fail on next page fetching.
         retry(r, h2.readyTopologyVersion(), nodeId);
     }
@@ -248,7 +246,7 @@ public class GridReduceQueryExecutor {
      * @param msg Message.
      */
     private void onFail(ClusterNode node, GridQueryFailResponse msg) {
-        QueryRun r = runs.get(msg.queryRequestId());
+        ReduceQueryRun r = runs.get(msg.queryRequestId());
 
         fail(r, node.id(), msg.error(), msg.failCode());
     }
@@ -258,7 +256,7 @@ public class GridReduceQueryExecutor {
      * @param nodeId Failed node ID.
      * @param msg Error message.
      */
-    private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) {
+    private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
         if (r != null) {
             CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
 
@@ -278,21 +276,21 @@ public class GridReduceQueryExecutor {
         final int qry = msg.query();
         final int seg = msg.segmentId();
 
-        final QueryRun r = runs.get(qryReqId);
+        final ReduceQueryRun r = runs.get(qryReqId);
 
         if (r == null) // Already finished with error or canceled.
             return;
 
-        final int pageSize = r.pageSize;
+        final int pageSize = r.pageSize();
 
-        GridMergeIndex idx = r.idxs.get(msg.query());
+        GridMergeIndex idx = r.indexes().get(msg.query());
 
         GridResultPage page;
 
         try {
             page = new GridResultPage(ctx, node.id(), msg) {
                 @Override public void fetchNextPage() {
-                    Object errState = r.state.get();
+                    Object errState = r.state();
 
                     if (errState != null) {
                         CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
@@ -335,7 +333,7 @@ public class GridReduceQueryExecutor {
         if (msg.retry() != null)
             retry(r, msg.retry(), node.id());
         else if (msg.page() == 0) // Do count down on each first page received.
-            r.latch.countDown();
+            r.latch().countDown();
     }
 
     /**
@@ -343,7 +341,7 @@ public class GridReduceQueryExecutor {
      * @param retryVer Retry version.
      * @param nodeId Node ID.
      */
-    private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
+    private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
         r.state(retryVer, nodeId);
     }
 
@@ -501,7 +499,7 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepPortable Keep portable.
      * @param enforceJoinOrder Enforce join order of tables.
@@ -512,7 +510,7 @@ public class GridReduceQueryExecutor {
      * @return Rows iterator.
      */
     public Iterator<List<?>> query(
-        GridCacheContext<?, ?> cctx,
+        String schemaName,
         GridCacheTwoStepQuery qry,
         boolean keepPortable,
         boolean enforceJoinOrder,
@@ -541,10 +539,8 @@ public class GridReduceQueryExecutor {
 
             final long qryReqId = qryIdGen.incrementAndGet();
 
-            final String cacheName = cctx.name();
-
-            final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), cacheName,
-                h2.connectionForCache(cacheName), qry.mapQueries().size(), qry.pageSize(),
+            final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
+                h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
                 U.currentTimeMillis(), cancel);
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -633,7 +629,7 @@ public class GridReduceQueryExecutor {
                     GridMergeTable tbl;
 
                     try {
-                        tbl = createMergeTable(r.conn, mapQry, qry.explain());
+                        tbl = createMergeTable(r.connection(), mapQry, qry.explain());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -641,7 +637,7 @@ public class GridReduceQueryExecutor {
 
                     idx = tbl.getMergeIndex();
 
-                    fakeTable(r.conn, tblIdx++).innerTable(tbl);
+                    fakeTable(r.connection(), tblIdx++).innerTable(tbl);
                 }
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
@@ -659,13 +655,13 @@ public class GridReduceQueryExecutor {
                 else
                     idx.setSources(nodes, segmentsPerIndex);
 
-                idx.setPageSize(r.pageSize);
+                idx.setPageSize(r.pageSize());
 
-                r.idxs.add(idx);
+                r.indexes().add(idx);
             }
 
-            r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
-                (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
+            r.latch(new CountDownLatch(isReplicatedOnly ? 1 :
+                (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt));
 
             runs.put(qryReqId, r);
 
@@ -719,7 +715,7 @@ public class GridReduceQueryExecutor {
                         new GridH2QueryRequest()
                                 .requestId(qryReqId)
                                 .topologyVersion(topVer)
-                                .pageSize(r.pageSize)
+                                .pageSize(r.pageSize())
                                 .caches(qry.cacheIds())
                                 .tables(distributedJoins ? qry.tables() : null)
                                 .partitions(convert(partsMap))
@@ -731,7 +727,7 @@ public class GridReduceQueryExecutor {
                         false)) {
                     awaitAllReplies(r, nodes, cancel);
 
-                    Object state = r.state.get();
+                    Object state = r.state();
 
                     if (state != null) {
                         if (state instanceof CacheException) {
@@ -764,7 +760,7 @@ public class GridReduceQueryExecutor {
                         List<List<?>> res = new ArrayList<>();
 
                         // Simple UNION ALL can have multiple indexes.
-                        for (GridMergeIndex idx : r.idxs) {
+                        for (GridMergeIndex idx : r.indexes()) {
                             Cursor cur = idx.findInStream(null, null);
 
                             while (cur.next()) {
@@ -788,21 +784,19 @@ public class GridReduceQueryExecutor {
 
                         UUID locNodeId = ctx.localNodeId();
 
-                        H2Utils.setupConnection(r.conn, false, enforceJoinOrder);
+                        H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
 
                         GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
-                            .pageSize(r.pageSize).distributedJoinMode(OFF));
+                            .pageSize(r.pageSize()).distributedJoinMode(OFF));
 
                         try {
-                            String schema = h2.schema(cacheName);
-
                             if (qry.explain())
-                                return explainPlan(r.conn, schema, qry, params);
+                                return explainPlan(r.connection(), schemaName, qry, params);
 
                             GridCacheSqlQuery rdc = qry.reduceQuery();
 
-                            ResultSet res = h2.executeSqlQueryWithTimer(schema,
-                                r.conn,
+                            ResultSet res = h2.executeSqlQueryWithTimer(schemaName,
+                                r.connection(),
                                 rdc.query(),
                                 F.asList(rdc.parameters(params)),
                                 false, // The statement will cache some extra thread local objects.
@@ -824,10 +818,10 @@ public class GridReduceQueryExecutor {
                     continue;
                 }
 
-                return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
+                return new GridQueryCacheObjectsIterator(resIter, h2.valueContext(), keepPortable);
             }
             catch (IgniteCheckedException | RuntimeException e) {
-                U.closeQuiet(r.conn);
+                U.closeQuiet(r.connection());
 
                 if (e instanceof CacheException) {
                     if (wasCancelled((CacheException)e))
@@ -898,7 +892,7 @@ public class GridReduceQueryExecutor {
      * @param distributedJoins Distributed join flag.
      */
     private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
-        QueryRun r,
+        ReduceQueryRun r,
         long qryReqId,
         boolean distributedJoins)
     {
@@ -906,7 +900,7 @@ public class GridReduceQueryExecutor {
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
         else {
-            for (GridMergeIndex idx : r.idxs) {
+            for (GridMergeIndex idx : r.indexes()) {
                 if (!idx.fetchedAll()) {
                     send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
 
@@ -922,9 +916,9 @@ public class GridReduceQueryExecutor {
      * @param cancel Query cancel.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+    private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
         throws IgniteInterruptedCheckedException, QueryCancelledException {
-        while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+        while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) {
 
             cancel.checkCancelled();
 
@@ -932,7 +926,7 @@ public class GridReduceQueryExecutor {
                 if (!ctx.discovery().alive(node)) {
                     handleNodeLeft(r, node.id());
 
-                    assert r.latch.getCount() == 0;
+                    assert r.latch().getCount() == 0;
 
                     return;
                 }
@@ -1420,7 +1414,7 @@ public class GridReduceQueryExecutor {
         CacheException err = new CacheException("Query was cancelled, client node disconnected.",
             new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
 
-        for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+        for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
             e.getValue().disconnected(err);
     }
 
@@ -1435,9 +1429,9 @@ public class GridReduceQueryExecutor {
 
         long curTime = U.currentTimeMillis();
 
-        for (QueryRun run : runs.values()) {
-            if (run.qry.longQuery(curTime, duration))
-                res.add(run.qry);
+        for (ReduceQueryRun run : runs.values()) {
+            if (run.queryInfo().longQuery(curTime, duration))
+                res.add(run.queryInfo());
         }
 
         return res;
@@ -1450,77 +1444,10 @@ public class GridReduceQueryExecutor {
      */
     public void cancelQueries(Collection<Long> queries) {
         for (Long qryId : queries) {
-            QueryRun run = runs.get(qryId);
+            ReduceQueryRun run = runs.get(qryId);
 
             if (run != null)
-                run.qry.cancel();
-        }
-    }
-
-    /**
-     * Query run.
-     */
-    private static class QueryRun {
-        /** */
-        private final GridRunningQueryInfo qry;
-
-        /** */
-        private final List<GridMergeIndex> idxs;
-
-        /** */
-        private CountDownLatch latch;
-
-        /** */
-        private final JdbcConnection conn;
-
-        /** */
-        private final int pageSize;
-
-        /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
-        private final AtomicReference<Object> state = new AtomicReference<>();
-
-        /**
-         * @param id Query ID.
-         * @param qry Query text.
-         * @param cache Cache where query was executed.
-         * @param conn Connection.
-         * @param idxsCnt Number of indexes.
-         * @param pageSize Page size.
-         * @param startTime Start time.
-         * @param cancel Query cancel handler.
-         */
-        private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
-            this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
-            this.conn = (JdbcConnection)conn;
-            this.idxs = new ArrayList<>(idxsCnt);
-            this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
-        }
-
-        /**
-         * @param o Fail state object.
-         * @param nodeId Node ID.
-         */
-        void state(Object o, @Nullable UUID nodeId) {
-            assert o != null;
-            assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
-
-            if (!state.compareAndSet(null, o))
-                return;
-
-            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
-                latch.countDown();
-
-            CacheException e = o instanceof CacheException ? (CacheException) o : null;
-
-            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-                idx.fail(nodeId, e);
-        }
-
-        /**
-         * @param e Error.
-         */
-        void disconnected(CacheException e) {
-            state(e, null);
+                run.queryInfo().cancel();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
new file mode 100644
index 0000000..73bb002
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.h2.jdbc.JdbcConnection;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.CacheException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
+/**
+ * Query run.
+ */
+class ReduceQueryRun {
+    /** */
+    private final GridRunningQueryInfo qry;
+
+    /** */
+    private final List<GridMergeIndex> idxs;
+
+    /** */
+    private CountDownLatch latch;
+
+    /** */
+    private final JdbcConnection conn;
+
+    /** */
+    private final int pageSize;
+
+    /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
+    private final AtomicReference<Object> state = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param conn Connection.
+     * @param idxsCnt Number of indexes.
+     * @param pageSize Page size.
+     * @param startTime Start time.
+     * @param cancel Query cancel handler.
+     */
+    ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime,
+        GridQueryCancel cancel) {
+        this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, false);
+
+        this.conn = (JdbcConnection)conn;
+
+        this.idxs = new ArrayList<>(idxsCnt);
+
+        this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+    }
+
+    /**
+     * @param o Fail state object.
+     * @param nodeId Node ID.
+     */
+    void state(Object o, @Nullable UUID nodeId) {
+        assert o != null;
+        assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
+
+        if (!state.compareAndSet(null, o))
+            return;
+
+        while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+            latch.countDown();
+
+        CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
+        for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+            idx.fail(nodeId, e);
+    }
+
+    /**
+     * @param e Error.
+     */
+    void disconnected(CacheException e) {
+        state(e, null);
+    }
+
+    /**
+     * @return Query info.
+     */
+    GridRunningQueryInfo queryInfo() {
+        return qry;
+    }
+
+    /**
+     * @return Page size.
+     */
+    int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Connection.
+     */
+    JdbcConnection connection() {
+        return conn;
+    }
+
+    /**
+     * @return State.
+     */
+    Object state() {
+        return state.get();
+    }
+
+    /**
+     * @return Indexes.
+     */
+    List<GridMergeIndex> indexes() {
+        return idxs;
+    }
+
+    /**
+     * @return Latch.
+     */
+    CountDownLatch latch() {
+        return latch;
+    }
+
+    /**
+     * @param latch Latch.
+     */
+    void latch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 5ac02a5..1f73dcb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
@@ -726,7 +727,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             return (T)val;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 5939b59..b66a343 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -870,7 +870,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
 
         IgniteH2Indexing idx = U.field(qryProcessor, "idx");
 
-        return (JdbcConnection)idx.connectionForCache(DEFAULT_CACHE_NAME);
+        String schemaName = idx.schema(DEFAULT_CACHE_NAME);
+
+        return (JdbcConnection)idx.connectionForSchema(schemaName);
     }
 
     /**


[2/2] ignite git commit: IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019.

Posted by vo...@apache.org.
IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aad3b0c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aad3b0c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aad3b0c5

Branch: refs/heads/master
Commit: aad3b0c536e3f7b0836d31daa63cd6d6137675d5
Parents: 858e5b7
Author: devozerov <vo...@gridgain.com>
Authored: Sat May 27 21:39:08 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Sat May 27 21:39:08 2017 +0300

----------------------------------------------------------------------
 .../internal/binary/BinaryEnumObjectImpl.java   |   5 +-
 .../internal/binary/BinaryObjectImpl.java       |  18 +-
 .../binary/BinaryObjectOffheapImpl.java         |   3 +-
 .../internal/processors/cache/CacheObject.java  |   2 +-
 .../processors/cache/CacheObjectAdapter.java    |   4 +-
 .../cache/CacheObjectByteArrayImpl.java         |   2 +-
 .../processors/cache/CacheObjectContext.java    | 197 ++-----------------
 .../processors/cache/CacheObjectImpl.java       |  25 ++-
 .../processors/cache/CacheObjectUtils.java      | 173 ++++++++++++++++
 .../cache/CacheObjectValueContext.java          |  50 +++++
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheEventManager.java |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../processors/cache/KeyCacheObjectImpl.java    |   4 +-
 .../cache/binary/CacheObjectBinaryContext.java  |   6 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   5 +-
 .../cache/database/CacheDataRowAdapter.java     |   8 +-
 .../cache/distributed/near/GridNearTxLocal.java |   3 +-
 .../cache/transactions/TxDeadlock.java          |   7 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   6 +-
 .../IgniteCacheObjectProcessorImpl.java         |  39 ++--
 .../query/CacheQueryObjectValueContext.java     |  64 ++++++
 .../query/GridQueryCacheObjectsIterator.java    |  16 +-
 .../processors/query/GridQueryProcessor.java    |  14 +-
 .../processors/query/GridRunningQueryInfo.java  |  16 +-
 .../query/VisorRunningQueriesCollectorTask.java |   2 +-
 .../internal/GridAffinityNoCacheSelfTest.java   |   3 +-
 .../IgniteIncompleteCacheObjectSelfTest.java    |   2 +-
 .../database/FreeListImplSelfTest.java          |   3 +-
 .../query/h2/DmlStatementsProcessor.java        |   4 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  44 +++--
 .../query/h2/opt/GridLuceneIndex.java           |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 159 ++++-----------
 .../query/h2/twostep/ReduceQueryRun.java        | 157 +++++++++++++++
 .../h2/GridIndexingSpiAbstractSelfTest.java     |   3 +-
 .../query/h2/sql/GridQueryParsingTest.java      |   4 +-
 37 files changed, 655 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
index f889e45..6a1ad6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -280,7 +281,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         return deserialize();
     }
 
@@ -335,7 +336,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.processor()).binaryContext();
+        this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 16e5ccd..d0d0699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -135,7 +136,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         Object obj0 = obj;
 
         if (obj0 == null || (cpy && needCopy(ctx)))
@@ -188,7 +189,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.processor()).binaryContext();
+        this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
     }
 
     /** {@inheritDoc} */
@@ -787,9 +788,9 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
      * @param coCtx CacheObjectContext.
      * @return Object.
      */
-    private Object deserializeValue(@Nullable CacheObjectContext coCtx) {
-        BinaryReaderExImpl reader = reader(null,
-            coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true);
+    private Object deserializeValue(@Nullable CacheObjectValueContext coCtx) {
+        BinaryReaderExImpl reader = reader(null, coCtx != null ?
+            coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true);
 
         Object obj0 = reader.deserialize();
 
@@ -807,8 +808,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
      * @param ctx Context.
      * @return {@code True} need to copy value returned to user.
      */
-    private boolean needCopy(CacheObjectContext ctx) {
-        return ctx.copyOnGet() && obj != null && !ctx.processor().immutable(obj);
+    private boolean needCopy(CacheObjectValueContext ctx) {
+        return ctx.copyOnGet() && obj != null && !ctx.kernalContext().cacheObjects().immutable(obj);
     }
 
     /**
@@ -819,7 +820,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
      * @param forUnmarshal {@code True} if reader is need to unmarshal object.
      * @return Reader.
      */
-    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr, boolean forUnmarshal) {
+    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr,
+        boolean forUnmarshal) {
         if (ldr == null)
             ldr = ctx.configuration().getClassLoader();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index bdf0ce1..0a0a7b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -444,7 +445,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         return (T)deserializeValue();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index c226ba2..8faaa03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -43,7 +43,7 @@ public interface CacheObject extends Message {
      * @param cpy If {@code true} need to copy value.
      * @return Value.
      */
-    @Nullable public <T> T value(CacheObjectContext ctx, boolean cpy);
+    @Nullable public <T> T value(CacheObjectValueContext ctx, boolean cpy);
 
     /**
      * @param ctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 6af38ac..e2a15ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -50,8 +50,8 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
      * @param ctx Context.
      * @return {@code True} need to copy value returned to user.
      */
-    protected boolean needCopy(CacheObjectContext ctx) {
-        return ctx.copyOnGet() && val != null && !ctx.processor().immutable(val);
+    protected boolean needCopy(CacheObjectValueContext ctx) {
+        return ctx.copyOnGet() && val != null && !ctx.kernalContext().cacheObjects().immutable(val);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
index fe284ae..6a13f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
@@ -62,7 +62,7 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         if (cpy)
             return (T)Arrays.copyOf(val, val.length);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index a777ab6..655a3e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -27,21 +27,17 @@ import java.util.Map;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryUtils;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
 @SuppressWarnings("TypeMayBeWeakened")
-public class CacheObjectContext {
+public class CacheObjectContext implements CacheObjectValueContext {
     /** */
     private GridKernalContext kernalCtx;
 
     /** */
-    private IgniteCacheObjectProcessor proc;
-
-    /** */
     private String cacheName;
 
     /** */
@@ -54,9 +50,6 @@ public class CacheObjectContext {
     private boolean storeVal;
 
     /** */
-    private boolean p2pEnabled;
-
-    /** */
     private boolean addDepInfo;
 
     /**
@@ -78,9 +71,6 @@ public class CacheObjectContext {
         this.cpyOnGet = cpyOnGet;
         this.storeVal = storeVal;
         this.addDepInfo = addDepInfo;
-
-        p2pEnabled = kernalCtx.config().isPeerClassLoadingEnabled();
-        proc = kernalCtx.cacheObjects();
     }
 
     /**
@@ -90,31 +80,18 @@ public class CacheObjectContext {
         return cacheName;
     }
 
-    /**
-     * @return {@code True} if peer class loading is enabled.
-     */
-    public boolean p2pEnabled() {
-        return p2pEnabled;
-    }
-
-    /**
-     * @return {@code True} if deployment info should be associated with the objects of this cache.
-     */
-    public boolean addDeploymentInfo() {
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
         return addDepInfo;
     }
 
-    /**
-     * @return Copy on get flag.
-     */
-    public boolean copyOnGet() {
+    /** {@inheritDoc} */
+    @Override public boolean copyOnGet() {
         return cpyOnGet;
     }
 
-    /**
-     * @return {@code True} if should store unmarshalled value in cache.
-     */
-    public boolean storeValue() {
+    /** {@inheritDoc} */
+    @Override public boolean storeValue() {
         return storeVal;
     }
 
@@ -125,27 +102,14 @@ public class CacheObjectContext {
         return dfltAffMapper;
     }
 
-    /**
-     * @return Kernal context.
-     */
-    public GridKernalContext kernalContext() {
+    /** {@inheritDoc} */
+    @Override public GridKernalContext kernalContext() {
         return kernalCtx;
     }
 
-    /**
-     * @return Processor.
-     */
-    public IgniteCacheObjectProcessor processor() {
-        return proc;
-    }
-
-    /**
-     * @param o Object to unwrap.
-     * @param keepBinary Keep binary flag.
-     * @return Unwrapped object.
-     */
-    public Object unwrapBinaryIfNeeded(Object o, boolean keepBinary) {
-        return unwrapBinaryIfNeeded(o, keepBinary, true);
+    /** {@inheritDoc} */
+    @Override public boolean binaryEnabled() {
+        return false;
     }
 
     /**
@@ -158,141 +122,6 @@ public class CacheObjectContext {
         if (o == null)
             return null;
 
-        return unwrapBinary(o, keepBinary, cpy);
-    }
-
-    /**
-     * @param col Collection of objects to unwrap.
-     * @param keepBinary Keep binary flag.
-     * @return Unwrapped collection.
-     */
-    public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary) {
-        return unwrapBinariesIfNeeded(col, keepBinary, true);
-    }
-
-    /**
-     * @param col Collection to unwrap.
-     * @param keepBinary Keep binary flag.
-     * @param cpy Copy value flag.
-     * @return Unwrapped collection.
-     */
-    public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary, boolean cpy) {
-        Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
-
-        if (col0 == null)
-            col0 = new ArrayList<>(col.size());
-
-        for (Object obj : col)
-            col0.add(unwrapBinary(obj, keepBinary, cpy));
-
-        return col0;
-    }
-
-    /**
-     * @param col Collection to unwrap.
-     * @param keepBinary Keep binary flag.
-     * @param cpy Copy flag.
-     * @return Unwrapped collection.
-     */
-    private Collection<Object> unwrapKnownCollection(Collection<Object> col, boolean keepBinary, boolean cpy) {
-        Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
-
-        for (Object obj : col)
-            col0.add(unwrapBinary(obj, keepBinary, cpy));
-
-        return col0;
-    }
-
-    /**
-     * Unwrap array of binaries if needed.
-     *
-     * @param arr Array.
-     * @param keepBinary Keep binary flag.
-     * @param cpy Copy.
-     * @return Result.
-     */
-    public Object[] unwrapBinariesInArrayIfNeeded(Object[] arr, boolean keepBinary, boolean cpy) {
-        if (BinaryUtils.knownArray(arr))
-            return arr;
-
-        Object[] res = new Object[arr.length];
-
-        for (int i = 0; i < arr.length; i++)
-            res[i] = unwrapBinary(arr[i], keepBinary, cpy);
-
-        return res;
-    }
-
-    /**
-     * Unwraps map.
-     *
-     * @param map Map to unwrap.
-     * @param keepBinary Keep binary flag.
-     * @return Unwrapped collection.
-     */
-    private Map<Object, Object> unwrapBinariesIfNeeded(Map<Object, Object> map, boolean keepBinary, boolean cpy) {
-        if (keepBinary)
-            return map;
-
-        Map<Object, Object> map0 = BinaryUtils.newMap(map);
-
-        for (Map.Entry<Object, Object> e : map.entrySet())
-            map0.put(unwrapBinary(e.getKey(), keepBinary, cpy), unwrapBinary(e.getValue(), keepBinary, cpy));
-
-        return map0;
-    }
-
-    /**
-     * @param o Object to unwrap.
-     * @return Unwrapped object.
-     */
-    private Object unwrapBinary(Object o, boolean keepBinary, boolean cpy) {
-        if (o instanceof Map.Entry) {
-            Map.Entry entry = (Map.Entry)o;
-
-            Object key = entry.getKey();
-
-            Object uKey = unwrapBinary(key, keepBinary, cpy);
-
-            Object val = entry.getValue();
-
-            Object uVal = unwrapBinary(val, keepBinary, cpy);
-
-            return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
-        }
-        else if (BinaryUtils.knownCollection(o))
-            return unwrapKnownCollection((Collection<Object>)o, keepBinary, cpy);
-        else if (BinaryUtils.knownMap(o))
-            return unwrapBinariesIfNeeded((Map<Object, Object>)o, keepBinary, cpy);
-        else if (o instanceof Object[])
-            return unwrapBinariesInArrayIfNeeded((Object[])o, keepBinary, cpy);
-        else if (o instanceof CacheObject) {
-            CacheObject co = (CacheObject)o;
-
-            if (!keepBinary || co.isPlatformType())
-                return unwrapBinary(co.value(this, cpy), keepBinary, cpy);
-        }
-
-        return o;
-    }
-
-    /**
-     * @param o Object to test.
-     * @return True if collection should be recursively unwrapped.
-     */
-    private boolean knownCollection(Object o) {
-        Class<?> cls = o == null ? null : o.getClass();
-
-        return cls == ArrayList.class || cls == LinkedList.class || cls == HashSet.class;
-    }
-
-    /**
-     * @param o Object to test.
-     * @return True if map should be recursively unwrapped.
-     */
-    private boolean knownMap(Object o) {
-        Class<?> cls = o == null ? null : o.getClass();
-
-        return cls == HashMap.class || cls == LinkedHashMap.class;
+        return CacheObjectUtils.unwrapBinaryIfNeeded(this, o, keepBinary, cpy);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index 7fe4297..76f354a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -53,27 +55,31 @@ public class CacheObjectImpl extends CacheObjectAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         cpy = cpy && needCopy(ctx);
 
         try {
+            GridKernalContext kernalCtx = ctx.kernalContext();
+
+            IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
             if (cpy) {
                 if (valBytes == null) {
                     assert val != null;
 
-                    valBytes = ctx.processor().marshal(ctx, val);
+                    valBytes = proc.marshal(ctx, val);
                 }
 
                 ClassLoader clsLdr;
 
                 if (val != null)
                     clsLdr = val.getClass().getClassLoader();
-                else if (ctx.kernalContext().config().isPeerClassLoadingEnabled())
-                    clsLdr = ctx.kernalContext().cache().context().deploy().globalLoader();
+                else if (kernalCtx.config().isPeerClassLoadingEnabled())
+                    clsLdr = kernalCtx.cache().context().deploy().globalLoader();
                 else
                     clsLdr = null;
 
-                return (T)ctx.processor().unmarshal(ctx, valBytes, clsLdr);
+                return (T)proc.unmarshal(ctx, valBytes, clsLdr);
             }
 
             if (val != null)
@@ -81,9 +87,8 @@ public class CacheObjectImpl extends CacheObjectAdapter {
 
             assert valBytes != null;
 
-            Object val = ctx.processor().unmarshal(ctx, valBytes,
-                ctx.kernalContext().config().isPeerClassLoadingEnabled() ?
-                    ctx.kernalContext().cache().context().deploy().globalLoader() : null);
+            Object val = proc.unmarshal(ctx, valBytes, kernalCtx.config().isPeerClassLoadingEnabled() ?
+                kernalCtx.cache().context().deploy().globalLoader() : null);
 
             if (ctx.storeValue())
                 this.val = val;
@@ -98,7 +103,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
     /** {@inheritDoc} */
     @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException {
         if (valBytes == null)
-            valBytes = ctx.processor().marshal(ctx, val);
+            valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
 
         return valBytes;
     }
@@ -116,7 +121,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
         assert val != null || valBytes != null;
 
         if (val == null && ctx.storeValue())
-            val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+            val = ctx.kernalContext().cacheObjects().unmarshal(ctx, valBytes, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
new file mode 100644
index 0000000..f9c76df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Cache object utility methods.
+ */
+public class CacheObjectUtils {
+    /**
+     * @param o Object to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy value flag.
+     * @return Unwrapped object.
+     */
+    public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
+        if (o == null)
+            return null;
+
+        return unwrapBinary(ctx, o, keepBinary, cpy);
+    }
+
+    /**
+     * @param col Collection of objects to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @return Unwrapped collection.
+     */
+    public static Collection<Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Collection<Object> col,
+        boolean keepBinary) {
+        return unwrapBinariesIfNeeded(ctx, col, keepBinary, true);
+    }
+
+    /**
+     * @param col Collection to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy flag.
+     * @return Unwrapped collection.
+     */
+    private static Collection<Object> unwrapKnownCollection(CacheObjectValueContext ctx, Collection<Object> col,
+        boolean keepBinary, boolean cpy) {
+        Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
+
+        assert col0 != null;
+
+        for (Object obj : col)
+            col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+
+        return col0;
+    }
+
+    /**
+     * Unwraps map.
+     *
+     * @param map Map to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @return Unwrapped collection.
+     */
+    private static Map<Object, Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Map<Object, Object> map,
+        boolean keepBinary, boolean cpy) {
+        if (keepBinary)
+            return map;
+
+        Map<Object, Object> map0 = BinaryUtils.newMap(map);
+
+        for (Map.Entry<Object, Object> e : map.entrySet())
+            map0.put(unwrapBinary(ctx, e.getKey(), false, cpy), unwrapBinary(ctx, e.getValue(), false, cpy));
+
+        return map0;
+    }
+
+    /**
+     * @param col Collection to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy value flag.
+     * @return Unwrapped collection.
+     */
+    private static Collection<Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Collection<Object> col,
+        boolean keepBinary, boolean cpy) {
+        Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
+
+        if (col0 == null)
+            col0 = new ArrayList<>(col.size());
+
+        for (Object obj : col)
+            col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+
+        return col0;
+    }
+
+    /**
+     * Unwrap array of binaries if needed.
+     *
+     * @param arr Array.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy.
+     * @return Result.
+     */
+    private static Object[] unwrapBinariesInArrayIfNeeded(CacheObjectValueContext ctx, Object[] arr, boolean keepBinary,
+        boolean cpy) {
+        if (BinaryUtils.knownArray(arr))
+            return arr;
+
+        Object[] res = new Object[arr.length];
+
+        for (int i = 0; i < arr.length; i++)
+            res[i] = unwrapBinary(ctx, arr[i], keepBinary, cpy);
+
+        return res;
+    }
+
+    /**
+     * @param o Object to unwrap.
+     * @return Unwrapped object.
+     */
+    @SuppressWarnings("unchecked")
+    private static Object unwrapBinary(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
+        if (o instanceof Map.Entry) {
+            Map.Entry entry = (Map.Entry)o;
+
+            Object key = entry.getKey();
+
+            Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+
+            Object val = entry.getValue();
+
+            Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+
+            return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+        }
+        else if (BinaryUtils.knownCollection(o))
+            return unwrapKnownCollection(ctx, (Collection<Object>)o, keepBinary, cpy);
+        else if (BinaryUtils.knownMap(o))
+            return unwrapBinariesIfNeeded(ctx, (Map<Object, Object>)o, keepBinary, cpy);
+        else if (o instanceof Object[])
+            return unwrapBinariesInArrayIfNeeded(ctx, (Object[])o, keepBinary, cpy);
+        else if (o instanceof CacheObject) {
+            CacheObject co = (CacheObject)o;
+
+            if (!keepBinary || co.isPlatformType())
+                return unwrapBinary(ctx, co.value(ctx, cpy), keepBinary, cpy);
+        }
+
+        return o;
+    }
+
+    /**
+     * Private constructor.
+     */
+    private CacheObjectUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
new file mode 100644
index 0000000..49b2873
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.GridKernalContext;
+
+/**
+ * Context to get value of cache object.
+ */
+public interface CacheObjectValueContext {
+    /**
+     * @return Kernal context.
+     */
+    public GridKernalContext kernalContext();
+
+    /**
+     * @return Copy on get flag.
+     */
+    public boolean copyOnGet();
+
+    /**
+     * @return {@code True} if should store unmarshalled value in cache.
+     */
+    public boolean storeValue();
+
+    /**
+     * @return {@code True} if deployment info should be associated with the objects of this cache.
+     */
+    public boolean addDeploymentInfo();
+
+    /**
+     * @return Binary enabled flag.
+     */
+    public boolean binaryEnabled();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a3e70dd..e637122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1713,7 +1713,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Unwrapped collection.
      */
     public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary) {
-        return cacheObjCtx.unwrapBinariesIfNeeded(col, keepBinary);
+        return CacheObjectUtils.unwrapBinariesIfNeeded(cacheObjCtx, col, keepBinary);
     }
 
     /**
@@ -1724,7 +1724,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Unwrapped object.
      */
     public Object unwrapBinaryIfNeeded(Object o, boolean keepBinary) {
-        return cacheObjCtx.unwrapBinaryIfNeeded(o, keepBinary);
+        return unwrapBinaryIfNeeded(o, keepBinary, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 687b132..93c5950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -309,7 +309,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
                 oldVal0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(oldVal, keepBinary, false);
             }
             catch (Exception e) {
-                if (!cctx.cacheObjectContext().processor().isBinaryEnabled(cctx.config()))
+                if (!cctx.cacheObjectContext().kernalContext().cacheObjects().isBinaryEnabled(cctx.config()))
                     throw e;
 
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 15e4469..4f87658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3840,13 +3840,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         /** {@inheritDoc} */
         @Override public K getKey() {
-            return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary);
+            return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, true);
         }
 
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public V getValue() {
-            return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(peekVisibleValue(), keepBinary);
+            return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(peekVisibleValue(), keepBinary, true);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index b9efab1..c50672b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -75,7 +75,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     /** {@inheritDoc} */
     @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException {
         if (valBytes == null)
-            valBytes = ctx.processor().marshal(ctx, val);
+            valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
 
         return valBytes;
     }
@@ -94,7 +94,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
         assert val != null;
 
         return (T)val;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
index 26c713c..3b3cf67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
@@ -53,10 +53,8 @@ public class CacheObjectBinaryContext extends CacheObjectContext {
         this.binaryEnabled = binaryEnabled;
     }
 
-    /**
-     * @return Binary enabled flag.
-     */
-    public boolean binaryEnabled() {
+    /** {@inheritDoc} */
+    @Override public boolean binaryEnabled() {
         return binaryEnabled;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 14947e9..5567809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -735,7 +736,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+    @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
         if (!((CacheObjectBinaryContext)ctx).binaryEnabled() || binaryMarsh == null)
             return super.marshal(ctx, val);
 
@@ -747,7 +748,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
-    @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+    @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
         throws IgniteCheckedException {
         if (!((CacheObjectBinaryContext)ctx).binaryEnabled() || binaryMarsh == null)
             return super.unmarshal(ctx, bytes, clsLdr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index afeada5..955ca69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -283,7 +283,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
             byte[] bytes = PageUtils.getBytes(addr, off, len);
             off += len;
 
-            key = coctx.processor().toKeyCacheObject(coctx, type, bytes);
+            key = coctx.kernalContext().cacheObjects().toKeyCacheObject(coctx, type, bytes);
 
             if (rowData == RowData.KEY_ONLY)
                 return;
@@ -300,7 +300,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
         byte[] bytes = PageUtils.getBytes(addr, off, len);
         off += len;
 
-        val = coctx.processor().toCacheObject(coctx, type, bytes);
+        val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes);
 
         ver = CacheVersionIO.read(addr + off, false);
 
@@ -359,7 +359,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
         ByteBuffer buf,
         IncompleteCacheObject incomplete
     ) throws IgniteCheckedException {
-        incomplete = coctx.processor().toKeyCacheObject(coctx, buf, incomplete);
+        incomplete = coctx.kernalContext().cacheObjects().toKeyCacheObject(coctx, buf, incomplete);
 
         if (incomplete.isReady()) {
             key = (KeyCacheObject)incomplete.object();
@@ -384,7 +384,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
         ByteBuffer buf,
         IncompleteCacheObject incomplete
     ) throws IgniteCheckedException {
-        incomplete = coctx.processor().toCacheObject(coctx, buf, incomplete);
+        incomplete = coctx.kernalContext().cacheObjects().toCacheObject(coctx, buf, incomplete);
 
         if (incomplete.isReady()) {
             val = incomplete.object();

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9ad084e..8c10e53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1728,7 +1728,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                         for (KeyCacheObject cacheKey : lockKeys) {
                             K keyVal = (K)
                                 (keepCacheObjects ? cacheKey :
-                                    cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+                                    cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary,
+                                        true));
 
                             if (retMap.containsKey(keyVal))
                                 // We already have a return value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
index 97db698..a613184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -133,15 +134,15 @@ public class TxDeadlock {
             IgniteTxKey txKey = e.getKey();
 
             try {
-                CacheObjectContext objCtx = ctx.cacheObjectContext(txKey.cacheId());
+                GridCacheContext cctx = ctx.cacheContext(txKey.cacheId());
 
-                Object val = txKey.key().value(objCtx, true);
+                Object val = txKey.key().value(cctx.cacheObjectContext(), true);
 
                 sb.append(e.getValue())
                     .append(" [key=")
                     .append(val)
                     .append(", cache=")
-                    .append(objCtx.cacheName())
+                    .append(cctx.name())
                     .append("]\n");
             }
             catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 9beb296..ee2d1f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -116,7 +117,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      * @return Value bytes.
      * @throws IgniteCheckedException If failed.
      */
-    public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException;
+    public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException;
 
     /**
      * @param ctx Context.
@@ -125,7 +126,8 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      * @return Unmarshalled object.
      * @throws IgniteCheckedException If failed.
      */
-    public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) throws IgniteCheckedException;
+    public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
+        throws IgniteCheckedException;
 
     /**
      * @param ccfg Cache configuration.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index a8595fb..de9256c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
@@ -55,8 +56,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     /** */
     private IgniteBinary noOpBinary = new NoOpBinary();
 
-    /**
-     *
+    /*
+     * Static initializer
      */
     static {
         IMMUTABLE_CLS.add(String.class);
@@ -99,12 +100,12 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+    @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
         return CU.marshal(ctx.kernalContext().cache().context(), ctx.addDeploymentInfo(), val);
     }
 
     /** {@inheritDoc} */
-    @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+    @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
         throws IgniteCheckedException {
         return U.unmarshal(ctx.kernalContext(), bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config()));
     }
@@ -166,7 +167,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
                 throw new IllegalArgumentException("Byte arrays cannot be used as cache keys.");
 
             case CacheObject.TYPE_REGULAR:
-                return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes, -1);
+                return new KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, null), bytes, -1);
         }
 
         throw new IllegalArgumentException("Invalid object type: " + type);
@@ -392,14 +393,18 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         /** {@inheritDoc} */
         @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
             try {
-                if (!ctx.processor().immutable(val)) {
+                IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+                if (!proc.immutable(val)) {
                     if (valBytes == null)
-                        valBytes = ctx.processor().marshal(ctx, val);
+                        valBytes = proc.marshal(ctx, val);
+
+                    boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
 
-                    ClassLoader ldr = ctx.p2pEnabled() ?
+                    ClassLoader ldr = p2pEnabled ?
                         IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
 
-                    Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+                    Object val = proc.unmarshal(ctx, valBytes, ldr);
 
                     KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
 
@@ -443,22 +448,26 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             return super.value(ctx, false); // Do not need copy since user value is not in cache.
         }
 
         /** {@inheritDoc} */
         @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
             try {
+                IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
                 if (valBytes == null)
-                    valBytes = ctx.processor().marshal(ctx, val);
+                    valBytes = proc.marshal(ctx, val);
 
                 if (ctx.storeValue()) {
-                    ClassLoader ldr = ctx.p2pEnabled() ?
+                    boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+                    ClassLoader ldr = p2pEnabled ?
                         IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
 
-                    Object val = this.val != null && ctx.processor().immutable(this.val) ? this.val :
-                        ctx.processor().unmarshal(ctx, valBytes, ldr);
+                    Object val = this.val != null && proc.immutable(this.val) ? this.val :
+                        proc.unmarshal(ctx, valBytes, ldr);
 
                     return new CacheObjectImpl(val, valBytes);
                 }
@@ -493,7 +502,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             return super.value(ctx, false); // Do not need copy since user value is not in cache.
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
new file mode 100644
index 0000000..08f71c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+
+/**
+ * Cache object value context for queries.
+ */
+public class CacheQueryObjectValueContext implements CacheObjectValueContext {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public CacheQueryObjectValueContext(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridKernalContext kernalContext() {
+        return ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean copyOnGet() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeValue() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return ctx.config().isPeerClassLoadingEnabled() && !binaryEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean binaryEnabled() {
+        return ctx.config().getMarshaller() instanceof BinaryMarshaller;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
index b006c75..b0e1562 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.query;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -31,19 +33,20 @@ public class GridQueryCacheObjectsIterator implements Iterator<List<?>>, AutoClo
     private final Iterator<List<?>> iter;
 
     /** */
-    private final GridCacheContext<?,?> cctx;
+    private final CacheObjectValueContext cacheObjValCtx;
 
     /** */
     private final boolean keepBinary;
 
     /**
      * @param iter Iterator.
-     * @param cctx Cache context.
+     * @param cacheObjValCtx Cache object context.
      * @param keepBinary Keep binary.
      */
-    public GridQueryCacheObjectsIterator(Iterator<List<?>> iter, GridCacheContext<?,?> cctx, boolean keepBinary) {
+    public GridQueryCacheObjectsIterator(Iterator<List<?>> iter, CacheObjectValueContext cacheObjValCtx,
+        boolean keepBinary) {
         this.iter = iter;
-        this.cctx = cctx;
+        this.cacheObjValCtx = cacheObjValCtx;
         this.keepBinary = keepBinary;
     }
 
@@ -61,7 +64,8 @@ public class GridQueryCacheObjectsIterator implements Iterator<List<?>>, AutoClo
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public List<?> next() {
-        return (List<?>)cctx.unwrapBinariesIfNeeded((Collection<Object>)iter.next(), keepBinary);
+        return ((List<?>)CacheObjectUtils.unwrapBinariesIfNeeded(
+            cacheObjValCtx, (Collection<Object>)iter.next(), keepBinary));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 76cde17..65151c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1587,10 +1587,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         try {
             CacheObjectContext coctx = cacheObjectContext(cacheName);
 
-            QueryTypeDescriptorImpl desc = typeByValue(coctx, key, val, true);
+            QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, true);
 
             if (prevVal != null) {
-                QueryTypeDescriptorImpl prevValDesc = typeByValue(coctx, key, prevVal, false);
+                QueryTypeDescriptorImpl prevValDesc = typeByValue(cacheName, coctx, key, prevVal, false);
 
                 if (prevValDesc != null && prevValDesc != desc)
                     idx.remove(cacheName, prevValDesc, key, partId, prevVal, prevVer);
@@ -1607,6 +1607,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param cacheName Cache name.
      * @param coctx Cache context.
      * @param key Key.
      * @param val Value.
@@ -1615,7 +1616,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If type check failed.
      */
     @SuppressWarnings("ConstantConditions")
-    @Nullable private QueryTypeDescriptorImpl typeByValue(CacheObjectContext coctx,
+    @Nullable private QueryTypeDescriptorImpl typeByValue(String cacheName,
+        CacheObjectContext coctx,
         KeyCacheObject key,
         CacheObject val,
         boolean checkType)
@@ -1629,12 +1631,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (binaryVal) {
             int typeId = ctx.cacheObjects().typeId(val);
 
-            id = new QueryTypeIdKey(coctx.cacheName(), typeId);
+            id = new QueryTypeIdKey(cacheName, typeId);
         }
         else {
             valCls = val.value(coctx, false).getClass();
 
-            id = new QueryTypeIdKey(coctx.cacheName(), valCls);
+            id = new QueryTypeIdKey(cacheName, valCls);
         }
 
         QueryTypeDescriptorImpl desc = types.get(id);
@@ -2078,7 +2080,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         try {
             CacheObjectContext coctx = cacheObjectContext(cacheName);
 
-            QueryTypeDescriptorImpl desc = typeByValue(coctx, key, val, false);
+            QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, false);
 
             if (desc == null)
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index d77c8c0..9e11cdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -32,8 +32,8 @@ public class GridRunningQueryInfo {
     /** Query type. */
     private final GridCacheQueryType qryType;
 
-    /** */
-    private final String cache;
+    /** Schema name. */
+    private final String schemaName;
 
     /** */
     private final long startTime;
@@ -48,17 +48,17 @@ public class GridRunningQueryInfo {
      * @param id Query ID.
      * @param qry Query text.
      * @param qryType Query type.
-     * @param cache Cache where query was executed.
+     * @param schemaName Schema name.
      * @param startTime Query start time.
      * @param cancel Query cancel.
      * @param loc Local query flag.
      */
-    public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+    public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String schemaName, long startTime,
         GridQueryCancel cancel, boolean loc) {
         this.id = id;
         this.qry = qry;
         this.qryType = qryType;
-        this.cache = cache;
+        this.schemaName = schemaName;
         this.startTime = startTime;
         this.cancel = cancel;
         this.loc = loc;
@@ -86,10 +86,10 @@ public class GridRunningQueryInfo {
     }
 
     /**
-     * @return Cache where query was executed.
+     * @return Schema name.
      */
-    public String cache() {
-        return cache;
+    public String schemaName() {
+        return schemaName;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
index 8d00dd6..9d1da00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
@@ -90,7 +90,7 @@ public class VisorRunningQueriesCollectorTask extends VisorMultiNodeTask<VisorRu
             long curTime = U.currentTimeMillis();
 
             for (GridRunningQueryInfo qry : queries)
-                res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(),
+                res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.schemaName(),
                     qry.startTime(), curTime - qry.startTime(),
                     qry.cancelable(), qry.local()));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index a49e095..e144325 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -235,7 +236,7 @@ public class GridAffinityNoCacheSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             A.notNull(ctx, "ctx");
 
             return (T)val;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
index 5be277a..77b6836 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
@@ -102,7 +102,7 @@ public class IgniteIncompleteCacheObjectSelfTest extends GridCommonAbstractTest
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(final CacheObjectContext ctx, final boolean cpy) {
+        @Nullable @Override public <T> T value(final CacheObjectValueContext ctx, final boolean cpy) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index 5f61bd6..a487218 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.database.MemoryMetricsImpl;
@@ -463,7 +464,7 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             return (T)data;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 0474aeb..352fe85 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -282,7 +282,7 @@ public class DmlStatementsProcessor {
             QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+                        return new GridQueryCacheObjectsIterator(res.iterator(), idx.valueContext(), cctx.keepBinary());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -372,7 +372,7 @@ public class DmlStatementsProcessor {
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+                        return new GridQueryCacheObjectsIterator(res.iterator(), idx.valueContext(), cctx.keepBinary());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c94c215..508c8be 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -303,6 +305,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     protected volatile GridKernalContext ctx;
 
+    /** Cache object value context. */
+    protected CacheQueryObjectValueContext valCtx;
+
     /** */
     private DmlStatementsProcessor dmlProc;
 
@@ -339,14 +344,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param cacheName Cache name.
-     * @return Connection.
-     */
-    public Connection connectionForCache(String cacheName) {
-        return connectionForSchema(schema(cacheName));
-    }
-
-    /**
      * @param schema Schema.
      * @return Connection.
      */
@@ -403,7 +400,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
-        return prepareStatement(connectionForCache(cacheName), sql, true);
+        String schemaName = schema(cacheName);
+
+        return prepareStatement(connectionForSchema(schemaName), sql, true);
     }
 
     /** {@inheritDoc} */
@@ -918,7 +917,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public long streamUpdateQuery(String cacheName, String qry,
         @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
-        final Connection conn = connectionForCache(cacheName);
+        String schemaName = schema(cacheName);
+
+        final Connection conn = connectionForSchema(schemaName);
 
         final PreparedStatement stmt;
 
@@ -1117,7 +1118,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+                        return new GridQueryCacheObjectsIterator(res.iterator(), valueContext(), keepBinary);
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -1231,7 +1232,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
@@ -1239,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Iterable result.
      */
     private Iterable<List<?>> runQueryTwoStep(
-        final GridCacheContext<?,?> cctx,
+        final String schemaName,
         final GridCacheTwoStepQuery qry,
         final boolean keepCacheObj,
         final boolean enforceJoinOrder,
@@ -1250,7 +1251,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     ) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
+                return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
                     parts);
             }
         };
@@ -1484,7 +1485,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             cancel = new GridQueryCancel();
 
         QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-            runQueryTwoStep(cctx, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
+            runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
                 qry.getArgs(), qry.getPartitions()), cancel);
 
         cursor.fieldsMeta(meta);
@@ -1900,12 +1901,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown.
      */
     long size(String cacheName, String typeName) throws IgniteCheckedException {
+        String schemaName = schema(cacheName);
+
         H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
 
         if (tbl == null)
             return -1;
 
-        Connection conn = connectionForCache(cacheName);
+        Connection conn = connectionForSchema(schemaName);
 
         H2Utils.setupConnection(conn, false, false);
 
@@ -1999,6 +2002,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         else {
             this.ctx = ctx;
 
+            valCtx = new CacheQueryObjectValueContext(ctx);
+
             nodeId = ctx.localNodeId();
             marshaller = ctx.config().getMarshaller();
 
@@ -2031,6 +2036,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @return Value context.
+     */
+    public CacheObjectValueContext valueContext() {
+        return valCtx;
+    }
+
+    /**
      * @param topic Topic.
      * @param topicOrd Topic ordinal for {@link GridTopic}.
      * @param nodes Nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index 48c0cb9..93ebc71 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -356,7 +356,7 @@ public class GridLuceneIndex implements AutoCloseable {
             if (coctx == null) // For tests.
                 return (Z)JdbcUtils.deserialize(bytes, null);
 
-            return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
+            return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx, bytes, ldr);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6fff8de..b7bdde5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -579,7 +579,9 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            Connection conn = h2.connectionForCache(mainCctx.name());
+            String schemaName = h2.schema(mainCctx.name());
+
+            Connection conn = h2.connectionForSchema(schemaName);
 
             H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);