You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 09:30:42 UTC

[16/24] ignite git commit: IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a31263f..8d9d953 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.CacheException;
@@ -100,7 +99,6 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -132,7 +130,7 @@ public class GridReduceQueryExecutor {
     private final AtomicLong qryIdGen;
 
     /** */
-    private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
 
     /** */
     private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
@@ -191,8 +189,8 @@ public class GridReduceQueryExecutor {
             @Override public void onEvent(final Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                for (QueryRun r : runs.values()) {
-                    for (GridMergeIndex idx : r.idxs) {
+                for (ReduceQueryRun r : runs.values()) {
+                    for (GridMergeIndex idx : r.indexes()) {
                         if (idx.hasSource(nodeId)) {
                             handleNodeLeft(r, nodeId);
 
@@ -208,7 +206,7 @@ public class GridReduceQueryExecutor {
      * @param r Query run.
      * @param nodeId Left node ID.
      */
-    private void handleNodeLeft(QueryRun r, UUID nodeId) {
+    private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) {
         // Will attempt to retry. If reduce query was started it will fail on next page fetching.
         retry(r, h2.readyTopologyVersion(), nodeId);
     }
@@ -248,7 +246,7 @@ public class GridReduceQueryExecutor {
      * @param msg Message.
      */
     private void onFail(ClusterNode node, GridQueryFailResponse msg) {
-        QueryRun r = runs.get(msg.queryRequestId());
+        ReduceQueryRun r = runs.get(msg.queryRequestId());
 
         fail(r, node.id(), msg.error(), msg.failCode());
     }
@@ -258,7 +256,7 @@ public class GridReduceQueryExecutor {
      * @param nodeId Failed node ID.
      * @param msg Error message.
      */
-    private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) {
+    private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
         if (r != null) {
             CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
 
@@ -278,21 +276,21 @@ public class GridReduceQueryExecutor {
         final int qry = msg.query();
         final int seg = msg.segmentId();
 
-        final QueryRun r = runs.get(qryReqId);
+        final ReduceQueryRun r = runs.get(qryReqId);
 
         if (r == null) // Already finished with error or canceled.
             return;
 
-        final int pageSize = r.pageSize;
+        final int pageSize = r.pageSize();
 
-        GridMergeIndex idx = r.idxs.get(msg.query());
+        GridMergeIndex idx = r.indexes().get(msg.query());
 
         GridResultPage page;
 
         try {
             page = new GridResultPage(ctx, node.id(), msg) {
                 @Override public void fetchNextPage() {
-                    Object errState = r.state.get();
+                    Object errState = r.state();
 
                     if (errState != null) {
                         CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
@@ -335,7 +333,7 @@ public class GridReduceQueryExecutor {
         if (msg.retry() != null)
             retry(r, msg.retry(), node.id());
         else if (msg.page() == 0) // Do count down on each first page received.
-            r.latch.countDown();
+            r.latch().countDown();
     }
 
     /**
@@ -343,7 +341,7 @@ public class GridReduceQueryExecutor {
      * @param retryVer Retry version.
      * @param nodeId Node ID.
      */
-    private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
+    private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
         r.state(retryVer, nodeId);
     }
 
@@ -501,7 +499,7 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepPortable Keep portable.
      * @param enforceJoinOrder Enforce join order of tables.
@@ -512,7 +510,7 @@ public class GridReduceQueryExecutor {
      * @return Rows iterator.
      */
     public Iterator<List<?>> query(
-        GridCacheContext<?, ?> cctx,
+        String schemaName,
         GridCacheTwoStepQuery qry,
         boolean keepPortable,
         boolean enforceJoinOrder,
@@ -541,10 +539,8 @@ public class GridReduceQueryExecutor {
 
             final long qryReqId = qryIdGen.incrementAndGet();
 
-            final String cacheName = cctx.name();
-
-            final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), cacheName,
-                h2.connectionForCache(cacheName), qry.mapQueries().size(), qry.pageSize(),
+            final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
+                h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
                 U.currentTimeMillis(), cancel);
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -633,7 +629,7 @@ public class GridReduceQueryExecutor {
                     GridMergeTable tbl;
 
                     try {
-                        tbl = createMergeTable(r.conn, mapQry, qry.explain());
+                        tbl = createMergeTable(r.connection(), mapQry, qry.explain());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -641,7 +637,7 @@ public class GridReduceQueryExecutor {
 
                     idx = tbl.getMergeIndex();
 
-                    fakeTable(r.conn, tblIdx++).innerTable(tbl);
+                    fakeTable(r.connection(), tblIdx++).innerTable(tbl);
                 }
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
@@ -659,13 +655,13 @@ public class GridReduceQueryExecutor {
                 else
                     idx.setSources(nodes, segmentsPerIndex);
 
-                idx.setPageSize(r.pageSize);
+                idx.setPageSize(r.pageSize());
 
-                r.idxs.add(idx);
+                r.indexes().add(idx);
             }
 
-            r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
-                (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
+            r.latch(new CountDownLatch(isReplicatedOnly ? 1 :
+                (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt));
 
             runs.put(qryReqId, r);
 
@@ -719,7 +715,7 @@ public class GridReduceQueryExecutor {
                         new GridH2QueryRequest()
                                 .requestId(qryReqId)
                                 .topologyVersion(topVer)
-                                .pageSize(r.pageSize)
+                                .pageSize(r.pageSize())
                                 .caches(qry.cacheIds())
                                 .tables(distributedJoins ? qry.tables() : null)
                                 .partitions(convert(partsMap))
@@ -731,7 +727,7 @@ public class GridReduceQueryExecutor {
                         false)) {
                     awaitAllReplies(r, nodes, cancel);
 
-                    Object state = r.state.get();
+                    Object state = r.state();
 
                     if (state != null) {
                         if (state instanceof CacheException) {
@@ -764,7 +760,7 @@ public class GridReduceQueryExecutor {
                         List<List<?>> res = new ArrayList<>();
 
                         // Simple UNION ALL can have multiple indexes.
-                        for (GridMergeIndex idx : r.idxs) {
+                        for (GridMergeIndex idx : r.indexes()) {
                             Cursor cur = idx.findInStream(null, null);
 
                             while (cur.next()) {
@@ -788,21 +784,19 @@ public class GridReduceQueryExecutor {
 
                         UUID locNodeId = ctx.localNodeId();
 
-                        H2Utils.setupConnection(r.conn, false, enforceJoinOrder);
+                        H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
 
                         GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
-                            .pageSize(r.pageSize).distributedJoinMode(OFF));
+                            .pageSize(r.pageSize()).distributedJoinMode(OFF));
 
                         try {
-                            String schema = h2.schema(cacheName);
-
                             if (qry.explain())
-                                return explainPlan(r.conn, schema, qry, params);
+                                return explainPlan(r.connection(), schemaName, qry, params);
 
                             GridCacheSqlQuery rdc = qry.reduceQuery();
 
-                            ResultSet res = h2.executeSqlQueryWithTimer(schema,
-                                r.conn,
+                            ResultSet res = h2.executeSqlQueryWithTimer(schemaName,
+                                r.connection(),
                                 rdc.query(),
                                 F.asList(rdc.parameters(params)),
                                 false, // The statement will cache some extra thread local objects.
@@ -824,10 +818,10 @@ public class GridReduceQueryExecutor {
                     continue;
                 }
 
-                return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
+                return new GridQueryCacheObjectsIterator(resIter, h2.valueContext(), keepPortable);
             }
             catch (IgniteCheckedException | RuntimeException e) {
-                U.closeQuiet(r.conn);
+                U.closeQuiet(r.connection());
 
                 if (e instanceof CacheException) {
                     if (wasCancelled((CacheException)e))
@@ -898,7 +892,7 @@ public class GridReduceQueryExecutor {
      * @param distributedJoins Distributed join flag.
      */
     private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
-        QueryRun r,
+        ReduceQueryRun r,
         long qryReqId,
         boolean distributedJoins)
     {
@@ -906,7 +900,7 @@ public class GridReduceQueryExecutor {
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
         else {
-            for (GridMergeIndex idx : r.idxs) {
+            for (GridMergeIndex idx : r.indexes()) {
                 if (!idx.fetchedAll()) {
                     send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
 
@@ -922,9 +916,9 @@ public class GridReduceQueryExecutor {
      * @param cancel Query cancel.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+    private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
         throws IgniteInterruptedCheckedException, QueryCancelledException {
-        while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+        while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) {
 
             cancel.checkCancelled();
 
@@ -932,7 +926,7 @@ public class GridReduceQueryExecutor {
                 if (!ctx.discovery().alive(node)) {
                     handleNodeLeft(r, node.id());
 
-                    assert r.latch.getCount() == 0;
+                    assert r.latch().getCount() == 0;
 
                     return;
                 }
@@ -1420,7 +1414,7 @@ public class GridReduceQueryExecutor {
         CacheException err = new CacheException("Query was cancelled, client node disconnected.",
             new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
 
-        for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+        for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
             e.getValue().disconnected(err);
     }
 
@@ -1435,9 +1429,9 @@ public class GridReduceQueryExecutor {
 
         long curTime = U.currentTimeMillis();
 
-        for (QueryRun run : runs.values()) {
-            if (run.qry.longQuery(curTime, duration))
-                res.add(run.qry);
+        for (ReduceQueryRun run : runs.values()) {
+            if (run.queryInfo().longQuery(curTime, duration))
+                res.add(run.queryInfo());
         }
 
         return res;
@@ -1450,77 +1444,10 @@ public class GridReduceQueryExecutor {
      */
     public void cancelQueries(Collection<Long> queries) {
         for (Long qryId : queries) {
-            QueryRun run = runs.get(qryId);
+            ReduceQueryRun run = runs.get(qryId);
 
             if (run != null)
-                run.qry.cancel();
-        }
-    }
-
-    /**
-     * Query run.
-     */
-    private static class QueryRun {
-        /** */
-        private final GridRunningQueryInfo qry;
-
-        /** */
-        private final List<GridMergeIndex> idxs;
-
-        /** */
-        private CountDownLatch latch;
-
-        /** */
-        private final JdbcConnection conn;
-
-        /** */
-        private final int pageSize;
-
-        /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
-        private final AtomicReference<Object> state = new AtomicReference<>();
-
-        /**
-         * @param id Query ID.
-         * @param qry Query text.
-         * @param cache Cache where query was executed.
-         * @param conn Connection.
-         * @param idxsCnt Number of indexes.
-         * @param pageSize Page size.
-         * @param startTime Start time.
-         * @param cancel Query cancel handler.
-         */
-        private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
-            this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
-            this.conn = (JdbcConnection)conn;
-            this.idxs = new ArrayList<>(idxsCnt);
-            this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
-        }
-
-        /**
-         * @param o Fail state object.
-         * @param nodeId Node ID.
-         */
-        void state(Object o, @Nullable UUID nodeId) {
-            assert o != null;
-            assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
-
-            if (!state.compareAndSet(null, o))
-                return;
-
-            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
-                latch.countDown();
-
-            CacheException e = o instanceof CacheException ? (CacheException) o : null;
-
-            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-                idx.fail(nodeId, e);
-        }
-
-        /**
-         * @param e Error.
-         */
-        void disconnected(CacheException e) {
-            state(e, null);
+                run.queryInfo().cancel();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
new file mode 100644
index 0000000..73bb002
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.h2.jdbc.JdbcConnection;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.CacheException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
+/**
+ * Query run.
+ */
+class ReduceQueryRun {
+    /** */
+    private final GridRunningQueryInfo qry;
+
+    /** */
+    private final List<GridMergeIndex> idxs;
+
+    /** */
+    private CountDownLatch latch;
+
+    /** */
+    private final JdbcConnection conn;
+
+    /** */
+    private final int pageSize;
+
+    /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
+    private final AtomicReference<Object> state = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param conn Connection.
+     * @param idxsCnt Number of indexes.
+     * @param pageSize Page size.
+     * @param startTime Start time.
+     * @param cancel Query cancel handler.
+     */
+    ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime,
+        GridQueryCancel cancel) {
+        this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, false);
+
+        this.conn = (JdbcConnection)conn;
+
+        this.idxs = new ArrayList<>(idxsCnt);
+
+        this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+    }
+
+    /**
+     * @param o Fail state object.
+     * @param nodeId Node ID.
+     */
+    void state(Object o, @Nullable UUID nodeId) {
+        assert o != null;
+        assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
+
+        if (!state.compareAndSet(null, o))
+            return;
+
+        while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+            latch.countDown();
+
+        CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
+        for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+            idx.fail(nodeId, e);
+    }
+
+    /**
+     * @param e Error.
+     */
+    void disconnected(CacheException e) {
+        state(e, null);
+    }
+
+    /**
+     * @return Query info.
+     */
+    GridRunningQueryInfo queryInfo() {
+        return qry;
+    }
+
+    /**
+     * @return Page size.
+     */
+    int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Connection.
+     */
+    JdbcConnection connection() {
+        return conn;
+    }
+
+    /**
+     * @return State.
+     */
+    Object state() {
+        return state.get();
+    }
+
+    /**
+     * @return Indexes.
+     */
+    List<GridMergeIndex> indexes() {
+        return idxs;
+    }
+
+    /**
+     * @return Latch.
+     */
+    CountDownLatch latch() {
+        return latch;
+    }
+
+    /**
+     * @param latch Latch.
+     */
+    void latch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 5ac02a5..1f73dcb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
@@ -726,7 +727,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
             return (T)val;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 5939b59..b66a343 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -870,7 +870,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
 
         IgniteH2Indexing idx = U.field(qryProcessor, "idx");
 
-        return (JdbcConnection)idx.connectionForCache(DEFAULT_CACHE_NAME);
+        String schemaName = idx.schema(DEFAULT_CACHE_NAME);
+
+        return (JdbcConnection)idx.connectionForSchema(schemaName);
     }
 
     /**