You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/19 13:12:49 UTC

[6/8] incubator-ignite git commit: ignite-gg9499 - fixes

ignite-gg9499 - fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1226b240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1226b240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1226b240

Branch: refs/heads/ignite-gg9499
Commit: 1226b2401b58e0e75d80e01787c0cee39ce5aeee
Parents: 29823e6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:09:04 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:09:04 2014 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  |  11 +-
 .../query/h2/twostep/GridMergeIndex.java        | 116 ++++++++++++++-----
 .../h2/twostep/GridMergeIndexUnsorted.java      |  66 ++++++-----
 .../query/h2/twostep/GridMergeTable.java        |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  34 +++---
 .../query/h2/twostep/GridResultPage.java        |  53 +++------
 .../twostep/messages/GridNextPageResponse.java  |  29 ++++-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  34 +++++-
 8 files changed, 240 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b86caf1..92d1875 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -211,19 +211,26 @@ public class GridMapQueryExecutor {
 
         assert res != null;
 
+        boolean last = false;
+
         synchronized (res) {
             page = qr.pages[qry]++;
 
             for (int i = 0 ; i < pageSize; i++) {
-                if (!res.next())
+                if (!res.next()) {
+                    last = true;
+
                     break;
+                }
 
                 rows.add(res.currentRow());
             }
         }
 
         try {
-            ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+            ctx.io().sendUserMessage(F.asList(node),
+                new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows),
+                GridTopic.TOPIC_QUERY, false, 0);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 16ba15d..5d51b32 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -10,11 +10,14 @@
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
 import org.h2.engine.*;
 import org.h2.index.*;
 import org.h2.message.*;
 import org.h2.result.*;
 import org.h2.table.*;
+import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -25,16 +28,32 @@ import java.util.concurrent.atomic.*;
  */
 public abstract class GridMergeIndex extends BaseIndex {
     /** */
+    protected final GridResultPage<?> END = new GridResultPage<Object>(null, null);
+
+    /** */
     private static final int MAX_FETCH_SIZE = 100000;
 
     /** */
     private final AtomicInteger cnt = new AtomicInteger(0);
 
+    /** Result sources. */
+    private final AtomicInteger srcs = new AtomicInteger(0);
+
     /**
      * Will be r/w from query execution thread only, does not need to be threadsafe.
      */
     private ArrayList<Row> fetched = new ArrayList<>();
 
+    /**
+     * @param tbl Table.
+     * @param name Index name.
+     * @param type Type.
+     * @param cols Columns.
+     */
+    public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) {
+        initBaseIndex(tbl, 0, name, cols, type);
+    }
+
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
         return cnt.get();
@@ -46,6 +65,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param srcs Number of sources.
+     */
+    public void setNumberOfSources(int srcs) {
+        this.srcs.set(srcs);
+    }
+
+    /**
      * @param cnt Count.
      */
     public void addCount(int cnt) {
@@ -55,7 +81,26 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param page Page.
      */
-    public abstract void addPage(GridResultPage<?> page);
+    public final void addPage(GridResultPage<?> page) {
+        if (!page.response().rows().isEmpty())
+            addPage0(page);
+        else
+            assert page.response().isLast();
+
+        if (page.response().isLast()) {
+            int srcs0 = srcs.decrementAndGet();
+
+            assert srcs0 >= 0;
+
+            if (srcs0 == 0)
+                addPage0(END); // We've fetched all.
+        }
+    }
+
+    /**
+     * @param page Page.
+     */
+    protected abstract void addPage0(GridResultPage<?> page);
 
     /** {@inheritDoc} */
     @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
@@ -145,7 +190,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      */
     protected class IteratorCursor implements Cursor {
         /** */
-        private Iterator<Row> iter;
+        protected Iterator<Row> iter;
 
         /** */
         protected Row cur;
@@ -185,47 +230,66 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * Fetching cursor.
      */
-    protected abstract class FetchingCursor extends IteratorCursor {
+    protected class FetchingCursor extends IteratorCursor {
         /** */
-        private boolean canFetch = true;
+        private Iterator<Row> stream;
 
         /**
          */
-        public FetchingCursor() {
-            super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
-        }
+        public FetchingCursor(Iterator<Row> stream) {
+            super(new FetchedIterator());
 
-        /**
-         * @return Next row or {@code null} if none available.
-         */
-        @Nullable protected abstract Row fetchNext();
+            assert stream != null;
+
+            this.stream = stream;
+        }
 
         /** {@inheritDoc} */
         @Override public boolean next() {
-            if (super.next())
+            if (super.next()) {
+                assert cur != null;
+
+                if (iter == stream && fetched != null) { // Cache fetched rows for reuse.
+                    if (fetched.size() == MAX_FETCH_SIZE)
+                        fetched = null; // Throw away fetched result if it is too large.
+                    else
+                        fetched.add(cur);
+                }
+
+                X.println("__ row: " + Arrays.toString(cur.getValueList()));
+
                 return true;
+            }
 
-            if (!canFetch)
+            if (iter == stream) // We've fetched the stream.
                 return false;
 
-            cur = fetchNext();
-
-            if (cur == null) { // No more results to fetch.
-                assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+            iter = stream; // Switch from cached to stream.
 
-                canFetch = false;
+            return next();
+        }
+    }
 
-                return false;
-            }
+    /**
+     * List iterator without {@link ConcurrentModificationException}.
+     */
+    private class FetchedIterator implements Iterator<Row> {
+        /** */
+        private int idx;
 
-            if (fetched != null) { // Try to reuse fetched result.
-                fetched.add(cur);
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return fetched != null && idx < fetched.size();
+        }
 
-                if (fetched.size() == MAX_FETCH_SIZE)
-                    fetched = null; // Throw away fetched result if it is too large.
-            }
+        /** {@inheritDoc} */
+        @Override public Row next() {
+            return fetched.get(idx++);
+        }
 
-            return true;
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index c6aaea9..b93c6ec 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -10,8 +10,10 @@
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.util.typedef.*;
 import org.h2.index.*;
 import org.h2.result.*;
+import org.h2.table.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
@@ -25,50 +27,60 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
     /** */
     private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
 
+    /**
+     * @param tbl  Table.
+     * @param name Index name.
+     */
+    public GridMergeIndexUnsorted(GridMergeTable tbl, String name) {
+        super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+    }
+
     /** {@inheritDoc} */
-    @Override public void addPage(GridResultPage<?> page) {
+    @Override public void addPage0(GridResultPage<?> page) {
         queue.add(page);
     }
 
     /** {@inheritDoc} */
     @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-        final GridResultPage<?> p = queue.poll();
-
-        assert p != null; // First page must be already fetched.
+        return new FetchingCursor(new Iterator<Row>() {
+            /** */
+            Iterator<Value[]> iter = Collections.emptyIterator();
 
-        if (p.isEmpty())
-            return new IteratorCursor(Collections.<Row>emptyIterator());
+            @Override public boolean hasNext() {
+                if (iter.hasNext())
+                    return true;
 
-        p.fetchNextPage(); // We always request next page before reading this one.
+                GridResultPage<?> page;
 
-        return new FetchingCursor() {
-            /** */
-            Iterator<Value[]> iter = p.rows().iterator();
+                try {
+                    page = queue.take();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException("Query execution was interrupted.", e);
+                }
 
-            @Nullable @Override protected Row fetchNext() {
-                if (!iter.hasNext()) {
-                    GridResultPage<?> page;
+                if (page == END) {
+                    assert queue.isEmpty() : "It must be the last page: " + queue;
 
-                    try {
-                        page = queue.take();
-                    }
-                    catch (InterruptedException e) {
-                        throw new IgniteException("Query execution was interrupted.", e);
-                    }
+                    return false; // We are done.
+                }
 
-                    if (page.isEmpty()) {
-                        assert queue.isEmpty() : "It must be the last page.";
+                page.fetchNextPage();
 
-                        return null; // Empty page - we are done.
-                    }
+                iter = page.response().rows().iterator();
 
-                    page.fetchNextPage();
+                assert iter.hasNext();
 
-                    iter = page.rows().iterator();
-                }
+                return true;
+            }
 
+            @Override public Row next() {
                 return new Row(iter.next(), 0);
             }
-        };
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index a1f2213..683bb54 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -27,7 +27,7 @@ public class GridMergeTable extends TableBase {
     private final ArrayList<Index> idxs = new ArrayList<>(1);
 
     /** */
-    private final GridMergeIndex idx = new GridMergeIndexUnsorted();
+    private final GridMergeIndex idx;
 
     /**
      * @param data Data.
@@ -35,6 +35,8 @@ public class GridMergeTable extends TableBase {
     public GridMergeTable(CreateTableData data) {
         super(data);
 
+        idx = new GridMergeIndexUnsorted(this, "merge_scan");
+
         idxs.add(idx);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3e7e12c..a12b4f9 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,8 +59,8 @@ public class GridReduceQueryExecutor {
 
         // TODO handle node failure.
 
-        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
                 assert msg != null;
 
                 ClusterNode node = ctx.discovery().node(nodeId);
@@ -69,6 +69,8 @@ public class GridReduceQueryExecutor {
                     onNextPage(node, (GridNextPageResponse)msg);
                 else if (msg instanceof GridQueryFailResponse)
                     onFail(node, (GridQueryFailResponse)msg);
+
+                return true;
             }
         });
     }
@@ -86,7 +88,13 @@ public class GridReduceQueryExecutor {
 
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
-        idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+        if (msg.allRows() != -1) { // Only the first page contains row count.
+            idx.addCount(msg.allRows());
+
+            r.latch.countDown();
+        }
+
+        idx.addPage(new GridResultPage<UUID>(node.id(), msg) {
             @Override public void fetchNextPage() {
                 try {
                     ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
@@ -96,12 +104,6 @@ public class GridReduceQueryExecutor {
                 }
             }
         });
-
-        if (msg.allRows() != -1) { // Only the first page contains row count.
-            idx.addCount(msg.allRows());
-
-            r.latch.countDown();
-        }
     }
 
     public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
@@ -118,17 +120,23 @@ public class GridReduceQueryExecutor {
             throw new IgniteException(e);
         }
 
-        for (GridCacheSqlQuery mapQry : qry.mapQueries())
-            r.tbls.add(createTable(r.conn, mapQry));
-
         Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
 
+        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+            GridMergeTable tbl = createTable(r.conn, mapQry);
+
+            tbl.getScanIndex(null).setNumberOfSources(nodes.size());
+
+            r.tbls.add(tbl);
+        }
+
         r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 
         this.runs.put(qryReqId, r);
 
         try {
-            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size
+                GridTopic.TOPIC_QUERY, false, 0);
 
             r.latch.await();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
index 6c6ab6a..ef52805 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -9,68 +9,51 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
-import org.h2.result.*;
-import org.h2.value.*;
-
-import java.util.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.internal.*;
 
 /**
  * Page result.
  */
-public abstract class GridResultPage<S> {
-    /** */
-    private final S src;
-
+public class GridResultPage<Z> {
     /** */
-    private final Collection<Value[]> rows;
+    private final Z src;
 
     /** */
-    private final int page;
+    private final GridNextPageResponse res;
 
     /**
      * @param src Source.
-     * @param page Page.
-     * @param rows Page rows.
+     * @param res Response.
      */
-    protected GridResultPage(S src, int page, Collection<Value[]> rows) {
-        assert src != null;
-        assert rows != null;
-
+    protected GridResultPage(Z src, GridNextPageResponse res) {
         this.src = src;
-        this.page = page;
-        this.rows = rows;
+        this.res = res;
     }
 
     /**
      * @return Result source.
      */
-    public S source() {
+    public Z source() {
         return src;
     }
 
     /**
-     * @return Page.
+     * @return Response.
      */
-    public int page() {
-        return page;
+    public GridNextPageResponse response() {
+        return res;
     }
 
     /**
-     * @return {@code true} If result is empty.
+     * Request next page.
      */
-    public boolean isEmpty() {
-        return rows.isEmpty();
+    public void fetchNextPage() {
+        throw new UnsupportedOperationException();
     }
 
-    /**
-     * @return Page rows.
-     */
-    public Collection<Value[]> rows() {
-        return rows;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridResultPage.class, this);
     }
-
-    /**
-     * Request next page.
-     */
-    public abstract void fetchNextPage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
index de38172..0834f4c 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
 
+import org.gridgain.grid.util.typedef.internal.*;
 import org.h2.store.*;
 import org.h2.value.*;
 
@@ -34,20 +35,32 @@ public class GridNextPageResponse implements Externalizable {
     /** */
     private Collection<Value[]> rows;
 
+    /** */
+    private boolean last;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridNextPageResponse() {
+        // No-op.
+    }
+
     /**
      * @param qryReqId Query request ID.
      * @param qry Query.
      * @param page Page.
      * @param allRows All rows count.
+     * @param last Last row.
      * @param rows Rows.
      */
-    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) {
         assert rows != null;
 
         this.qryReqId = qryReqId;
         this.qry = qry;
         this.page = page;
         this.allRows = allRows;
+        this.last = last;
         this.rows = rows;
     }
 
@@ -80,6 +93,13 @@ public class GridNextPageResponse implements Externalizable {
     }
 
     /**
+     * @return {@code true} If this is the last page.
+     */
+    public boolean isLast() {
+        return last;
+    }
+
+    /**
      * @return Rows.
      */
     public Collection<Value[]> rows() {
@@ -91,6 +111,7 @@ public class GridNextPageResponse implements Externalizable {
         out.writeLong(qryReqId);
         out.writeInt(qry);
         out.writeInt(page);
+        out.writeBoolean(last);
         out.writeInt(allRows);
 
         out.writeInt(rows.size());
@@ -122,6 +143,7 @@ public class GridNextPageResponse implements Externalizable {
         qryReqId = in.readLong();
         qry = in.readInt();
         page = in.readInt();
+        last = in.readBoolean();
         allRows = in.readInt();
 
         int rowCnt = in.readInt();
@@ -146,4 +168,9 @@ public class GridNextPageResponse implements Externalizable {
             }
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNextPageResponse.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index bc9dcf8..90a2d4c 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -18,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.junits.common.*;
 
 import java.util.*;
@@ -88,11 +90,37 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         return cc;
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoStep() throws Exception {
+        fillCaches();
+
+        GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select * from _cnts_");
+
+        q.addMapQuery("_cnts_", "select count(*) cnt from \"partitioned\".FactPurchase");
+
+        GridCacheQueriesEx<Integer, FactPurchase> qx =
+            (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries();
+
+        for (List<?> row : qx.execute(q).get())
+            X.println("__ "  + row);
+
+
+
+//        Object cnt = .next().get(0);
+//
+//        assertEquals(10L, cnt);
+    }
+
     /** @throws Exception If failed. */
     public void testOnProjection() throws Exception {
+        fillCaches();
+
         GridCacheProjection<Integer, FactPurchase> prj = ignite.<Integer, FactPurchase>cache("partitioned").projection(
             new IgnitePredicate<GridCacheEntry<Integer, FactPurchase>>() {
-                @Override public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
+                @Override
+                public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
                     return e.getKey() > 12;
                 }
             });
@@ -105,7 +133,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
     /**
      * @throws IgniteCheckedException If failed.
      */
-    private void fillCaches() throws IgniteCheckedException {
+    private void fillCaches() throws IgniteCheckedException, InterruptedException {
+        awaitPartitionMapExchange();
+
         int idGen = 0;
 
         GridCache<Integer, Object> dimCache = ignite.cache("replicated");