You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/19 13:12:44 UTC

[1/8] incubator-ignite git commit: ignite-gg9499 -

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg9499 [created] 67df1ce35


ignite-gg9499 -


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4da1d1a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4da1d1a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4da1d1a4

Branch: refs/heads/ignite-gg9499
Commit: 4da1d1a496c3eb8e0e73a397fcd02322f3508416
Parents: c67dcde
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 12 16:34:17 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 12 16:34:17 2014 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMergeIndex.java        | 162 +++++++++++++++++++
 .../query/h2/twostep/GridMergeTable.java        | 145 +++++++++++++++++
 .../query/h2/twostep/GridNextPageRequest.java   |  54 +++++++
 .../query/h2/twostep/GridNextPageResponse.java  |  47 ++++++
 .../query/h2/twostep/GridQueryAck.java          |  42 +++++
 .../query/h2/twostep/GridQueryRequest.java      |  50 ++++++
 6 files changed, 500 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
new file mode 100644
index 0000000..163256b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -0,0 +1,162 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.table.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Merge index.
+ */
+public class GridMergeIndex extends BaseIndex {
+    /** */
+    private static final int MAX_CAPACITY = 100_000;
+
+    /** */
+    private static final Collection<Row> END = new ArrayList<>(0);
+
+    /** */
+    private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>();
+
+    /** */
+    private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>();
+
+    /** */
+    private int fetchedCnt;
+
+    /** */
+    private final AtomicInteger cnt = new AtomicInteger(0);
+
+    /** {@inheritDoc} */
+    @Override public long getRowCount(Session session) {
+        return cnt.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCountApproximation() {
+        return getRowCount(null);
+    }
+
+    /**
+     * @param cnt Count.
+     */
+    public void addCount(int cnt) {
+        this.cnt.addAndGet(cnt);
+    }
+
+    /**
+     * @param rows0 Rows.
+     */
+    public void addRows(Collection<Row> rows0) {
+        assert !rows0.isEmpty();
+
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkRename() {
+        throw DbException.getUnsupportedException("rename");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(Session session) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void add(Session session, Row row) {
+        throw DbException.getUnsupportedException("add");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session session, Row row) {
+        throw DbException.getUnsupportedException("remove row");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+        if (fetchedRows == null)
+            throw new GridRuntimeException("Rows were dropped out of result set.");
+
+        return new Cursor0();
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+        return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session session) {
+        throw DbException.getUnsupportedException("remove index");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void truncate(Session session) {
+        throw DbException.getUnsupportedException("truncate");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canGetFirstOrLast() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor findFirstOrLast(Session session, boolean first) {
+        throw DbException.getUnsupportedException("findFirstOrLast");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needRebuild() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDiskSpaceUsed() {
+        return 0;
+    }
+
+    private class Cursor0 implements Cursor {
+        /** */
+        private Row cur;
+
+        /** */
+        private Iterator<Row> curIter;
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return cur;
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            throw DbException.getUnsupportedException("previous");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
new file mode 100644
index 0000000..3f059fa
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -0,0 +1,145 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.h2.command.ddl.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.table.*;
+
+import java.util.*;
+
+/**
+ * Merge table for distributed queries.
+ */
+public class GridMergeTable extends TableBase {
+    /** */
+    private final ArrayList<Index> idxs = new ArrayList<>(1);
+
+    /** */
+    private final GridMergeIndex idx = new GridMergeIndex();
+
+    /**
+     * @param data Data.
+     */
+    public GridMergeTable(CreateTableData data) {
+        super(data);
+
+        idxs.add(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lock(Session session, boolean exclusive, boolean force) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(Session ses) {
+        idx.close(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unlock(Session s) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols,
+        IndexType indexType, boolean create, String indexComment) {
+        throw DbException.getUnsupportedException("addIndex");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeRow(Session session, Row row) {
+        throw DbException.getUnsupportedException("removeRow");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void truncate(Session session) {
+        throw DbException.getUnsupportedException("truncate");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addRow(Session session, Row row) {
+        throw DbException.getUnsupportedException("addRow");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkSupportAlter() {
+        throw DbException.getUnsupportedException("alter");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTableType() {
+        return EXTERNAL_TABLE_ENGINE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridMergeIndex getScanIndex(Session session) {
+        return idx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Index getUniqueIndex() {
+        return null; // We don't have a PK.
+    }
+
+    /** {@inheritDoc} */
+    @Override public ArrayList<Index> getIndexes() {
+        return idxs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLockedExclusively() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaxDataModificationId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDeterministic() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canGetRowCount() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canDrop() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCount(Session ses) {
+        return idx.getRowCount(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCountApproximation() {
+        return idx.getRowCountApproximation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDiskSpaceUsed() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkRename() {
+        throw DbException.getUnsupportedException("rename");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
new file mode 100644
index 0000000..d550b3b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
@@ -0,0 +1,54 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private long reqId;
+
+    /** */
+    private long qryId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int offset;
+
+    /** */
+    private int pageSize;
+
+    @Override public boolean writeTo(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public byte directType() {
+        return 0;
+    }
+
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        return null;
+    }
+
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
new file mode 100644
index 0000000..d77215c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+import org.h2.value.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * TODO write doc
+ */
+public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private long reqId;
+
+    /** */
+    private Collection<Value[]> rows;
+
+    @Override public boolean writeTo(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public byte directType() {
+        return 0;
+    }
+
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        return null;
+    }
+
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
new file mode 100644
index 0000000..10e30ee
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryAck extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private long reqId;
+
+    @Override public boolean writeTo(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public byte directType() {
+        return 0;
+    }
+
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        return null;
+    }
+
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
new file mode 100644
index 0000000..7a664fd
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
@@ -0,0 +1,50 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryRequest extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private long reqId;
+
+    /** */
+    private List<String> sqlQrys;
+
+    /** */
+    private List<Collection<Object>> params;
+
+
+    @Override public boolean writeTo(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf) {
+        return false;
+    }
+
+    @Override public byte directType() {
+        return 0;
+    }
+
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        return null;
+    }
+
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+    }
+}


[5/8] incubator-ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499

Posted by se...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29823e67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29823e67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29823e67

Branch: refs/heads/ignite-gg9499
Commit: 29823e67eb6d187897219e6fe859e9cffd24658c
Parents: 3e62b4e 3032bee
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 09:00:51 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 09:00:51 2014 +0300

----------------------------------------------------------------------
 .../optimized-classnames.previous.properties    | 30621 -----------------
 .../optimized/optimized-classnames.properties   | 30621 -----------------
 .../dht/atomic/GridDhtAtomicCache.java          |     2 +-
 .../gridgain/grid/kernal/visor/VisorJob.java    |     6 +-
 .../grid/kernal/visor/cache/VisorCache.java     |     4 +-
 .../cache/VisorCacheAggregatedMetrics.java      |   417 +-
 .../kernal/visor/cache/VisorCacheMetrics.java   |   176 +-
 .../kernal/visor/cache/VisorCacheMetrics2.java  |   218 -
 .../cache/VisorCacheMetricsCollectorTask.java   |   104 +-
 .../cache/VisorCacheQueryAggregatedMetrics.java |   129 -
 .../visor/cache/VisorCacheQueryMetrics.java     |    44 +-
 .../visor/node/VisorGridConfiguration.java      |     2 +-
 .../VisorNodeConfigurationCollectorJob.java     |     2 +-
 .../grid/kernal/visor/util/VisorTaskUtils.java  |    42 +
 .../core/src/main/resources/gridgain.properties |     2 +-
 .../commands/cache/VisorCacheCommand.scala      |   127 +-
 .../visor/commands/vvm/VisorVvmCommand.scala    |     9 +-
 .../gridgain/visor/VisorRuntimeBaseSpec.scala   |     1 -
 .../org/gridgain/visor/VisorTextTableSpec.scala |     3 +-
 .../visor/commands/VisorArgListSpec.scala       |     3 +-
 .../commands/VisorFileNameCompleterSpec.scala   |     4 +-
 .../commands/alert/VisorAlertCommandSpec.scala  |     5 +-
 .../cache/VisorCacheClearCommandSpec.scala      |     9 +-
 .../commands/cache/VisorCacheCommandSpec.scala  |     6 +-
 .../cache/VisorCacheCompactCommandSpec.scala    |     9 +-
 .../config/VisorConfigurationCommandSpec.scala  |     6 +-
 .../cswap/VisorCacheSwapCommandSpec.scala       |     6 +-
 .../deploy/VisorDeployCommandSpec.scala         |     3 +-
 .../disco/VisorDiscoveryCommandSpec.scala       |     2 +-
 .../events/VisorEventsCommandSpec.scala         |     3 +-
 .../visor/commands/gc/VisorGcCommandSpec.scala  |     3 +-
 .../commands/help/VisorHelpCommandSpec.scala    |     3 +-
 .../commands/kill/VisorKillCommandSpec.scala    |     3 +-
 .../commands/log/VisorLogCommandSpec.scala      |     3 +-
 .../commands/mem/VisorMemoryCommandSpec.scala   |     3 +-
 .../commands/open/VisorOpenCommandSpec.scala    |     1 +
 .../commands/start/VisorStartCommandSpec.scala  |     3 +-
 .../commands/tasks/VisorTasksCommandSpec.scala  |    11 +-
 .../commands/vvm/VisorVvmCommandSpec.scala      |     3 +-
 .../testsuites/VisorConsoleSelfTestSuite.scala  |    13 +-
 pom.xml                                         |     7 +-
 41 files changed, 521 insertions(+), 62118 deletions(-)
----------------------------------------------------------------------



[8/8] incubator-ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499

Posted by se...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/67df1ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/67df1ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/67df1ce3

Branch: refs/heads/ignite-gg9499
Commit: 67df1ce352c28c0704e3309bb0267d2e0259f9c4
Parents: 50e88a6 47cf475
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:11:32 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:11:32 2014 +0300

----------------------------------------------------------------------
 ipc/shmem/ggshmem/Makefile.am                   |   2 +-
 ipc/shmem/ggshmem/Makefile.in                   |   2 +-
 .../java/META-INF/native/linux32/libggshmem.so  | Bin 136714 -> 0 bytes
 .../java/META-INF/native/linux64/libggshmem.so  | Bin 145904 -> 161925 bytes
 .../java/META-INF/native/osx/libggshmem.dylib   | Bin 29312 -> 32940 bytes
 5 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[4/8] incubator-ignite git commit: ignite-gg9499 -

Posted by se...@apache.org.
ignite-gg9499 -


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3e62b4eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3e62b4eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3e62b4eb

Branch: refs/heads/ignite-gg9499
Commit: 3e62b4eb7be6b91101cb6c09ea10fd5c5c250600
Parents: 193d9b3
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 09:00:29 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 09:00:29 2014 +0300

----------------------------------------------------------------------
 .../testsuites/GridUtilSelfTestSuite.java       |  1 -
 .../cache/GridCacheCrossCacheQuerySelfTest.java | 23 ++++++++++++--------
 2 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e62b4eb/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
index 5c05824..b3eabd4 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
@@ -35,7 +35,6 @@ public class GridUtilSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridThreadPoolExecutorServiceSelfTest.class);
         suite.addTestSuite(GridUtilsSelfTest.class);
         suite.addTestSuite(GridSpinReadWriteLockSelfTest.class);
-        suite.addTestSuite(GridQueueSelfTest.class);
         suite.addTestSuite(GridStringBuilderFactorySelfTest.class);
         suite.addTestSuite(GridToStringBuilderSelfTest.class);
         suite.addTestSuite(GridByteArrayListSelfTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e62b4eb/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 0720c91..bc9dcf8 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -103,14 +103,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Fills the caches with data and executes the query.
-     *
-     * @param prj Cache projection.
-     * @throws Exception If failed.
-     * @return Result.
+     * @throws IgniteCheckedException If failed.
      */
-    private List<Map.Entry<Integer, FactPurchase>> body(GridCacheProjection<Integer, FactPurchase> prj)
-        throws Exception {
+    private void fillCaches() throws IgniteCheckedException {
         int idGen = 0;
 
         GridCache<Integer, Object> dimCache = ignite.cache("replicated");
@@ -155,9 +150,19 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 
             factCache.put(id, new FactPurchase(id, prod.getId(), store.getId()));
         }
+    }
 
-        GridCacheQuery<Map.Entry<Integer, FactPurchase>> qry = (prj == null ? factCache : prj).queries().createSqlQuery(
-            FactPurchase.class,
+    /**
+     * Fills the caches with data and executes the query.
+     *
+     * @param prj Cache projection.
+     * @throws Exception If failed.
+     * @return Result.
+     */
+    private List<Map.Entry<Integer, FactPurchase>> body(GridCacheProjection<Integer, FactPurchase> prj)
+        throws Exception {
+        GridCacheQuery<Map.Entry<Integer, FactPurchase>> qry = (prj == null ?
+            ignite.<Integer, FactPurchase>cache("partitioned") : prj).queries().createSqlQuery(FactPurchase.class,
             "from \"replicated\".DimStore, \"partitioned\".FactPurchase where DimStore.id = FactPurchase.storeId");
 
         List<Map.Entry<Integer, FactPurchase>> res = new ArrayList<>(qry.execute().get());


[7/8] incubator-ignite git commit: ignite-gg9499 - minor

Posted by se...@apache.org.
ignite-gg9499 - minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50e88a64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50e88a64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50e88a64

Branch: refs/heads/ignite-gg9499
Commit: 50e88a642829b69e4ab9aefec115134913c6f5d8
Parents: 1226b24
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:11:05 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:11:05 2014 +0300

----------------------------------------------------------------------
 .../grid/kernal/processors/query/h2/twostep/GridMergeIndex.java    | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50e88a64/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 5d51b32..04a4c39 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -256,8 +256,6 @@ public abstract class GridMergeIndex extends BaseIndex {
                         fetched.add(cur);
                 }
 
-                X.println("__ row: " + Arrays.toString(cur.getValueList()));
-
                 return true;
             }
 


[3/8] incubator-ignite git commit: ignite-gg9499 -

Posted by se...@apache.org.
ignite-gg9499 -


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/193d9b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/193d9b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/193d9b32

Branch: refs/heads/ignite-gg9499
Commit: 193d9b32183cf4b35eb0bdb026c37147942bf15e
Parents: 4da1d1a
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Dec 18 19:29:25 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Dec 18 19:29:25 2014 +0300

----------------------------------------------------------------------
 .../org/gridgain/grid/kernal/GridTopic.java     |   5 +-
 .../cache/query/GridCacheQueriesEx.java         |   7 +
 .../cache/query/GridCacheQueriesImpl.java       |   5 +
 .../cache/query/GridCacheQueriesProxy.java      |  12 +
 .../cache/query/GridCacheQueryManager.java      |  26 +-
 .../cache/query/GridCacheSqlQuery.java          |  91 +++++
 .../cache/query/GridCacheSqlResult.java         |  19 ++
 .../cache/query/GridCacheTwoStepQuery.java      |  66 ++++
 .../processors/query/GridQueryIndexing.java     |  10 +
 .../processors/query/GridQueryProcessor.java    |  17 +
 .../java/org/gridgain/grid/util/GridQueue.java  | 340 -------------------
 .../gridgain/grid/util/GridQueueSelfTest.java   |  62 ----
 .../processors/query/h2/GridH2Indexing.java     | 130 ++++---
 .../query/h2/GridH2ResultSetIterator.java       |   3 +-
 .../query/h2/opt/GridH2IndexBase.java           |   5 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 263 ++++++++++++++
 .../query/h2/twostep/GridMergeIndex.java        | 131 +++++--
 .../h2/twostep/GridMergeIndexUnsorted.java      |  74 ++++
 .../query/h2/twostep/GridMergeTable.java        |  33 +-
 .../query/h2/twostep/GridNextPageRequest.java   |  54 ---
 .../query/h2/twostep/GridNextPageResponse.java  |  47 ---
 .../query/h2/twostep/GridQueryAck.java          |  42 ---
 .../query/h2/twostep/GridQueryRequest.java      |  50 ---
 .../h2/twostep/GridReduceQueryExecutor.java     | 199 +++++++++++
 .../query/h2/twostep/GridResultPage.java        |  76 +++++
 .../twostep/messages/GridNextPageRequest.java   |  59 ++++
 .../twostep/messages/GridNextPageResponse.java  | 149 ++++++++
 .../query/h2/twostep/messages/GridQueryAck.java |  34 ++
 .../twostep/messages/GridQueryFailResponse.java |  46 +++
 .../h2/twostep/messages/GridQueryRequest.java   |  61 ++++
 30 files changed, 1402 insertions(+), 714 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
index 5fedbd9..7ab61d9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
@@ -77,7 +77,10 @@ public enum GridTopic {
     TOPIC_TIME_SYNC,
 
     /** */
-    TOPIC_HADOOP;
+    TOPIC_HADOOP,
+
+    /** */
+    TOPIC_QUERY;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
index d1732fb..e854367 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
@@ -10,6 +10,7 @@
 package org.gridgain.grid.kernal.processors.cache.query;
 
 import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.query.*;
 
 import java.util.*;
@@ -41,4 +42,10 @@ public interface GridCacheQueriesEx<K, V> extends GridCacheQueries<K, V> {
      * @return Query.
      */
     public <R> GridCacheQuery<R> createSpiQuery();
+
+    /**
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
index 3ba1ceb..f643cb2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
@@ -158,6 +158,11 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+        return ctx.kernalContext().query().queryTwoStep(qry);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridCacheContinuousQuery<K, V> createContinuousQuery() {
         return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
index 9edcf6a..61f7ac7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
@@ -166,6 +166,18 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.execute(qry);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteFuture<?> rebuildIndexes(Class<?> cls) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
index 25c0668..5fbc366 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
@@ -51,7 +51,7 @@ import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
     /** */
-    protected GridQueryProcessor idxProc;
+    protected GridQueryProcessor qryProc;
 
     /** */
     private String space;
@@ -78,7 +78,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
-        idxProc = cctx.kernalContext().query();
+        qryProc = cctx.kernalContext().query();
         space = cctx.name();
         maxIterCnt = cctx.config().getMaximumQueryIteratorCount();
 
@@ -165,7 +165,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to get size (grid is stopping).");
 
         try {
-            return idxProc.size(space, valType);
+            return qryProc.size(space, valType);
         }
         finally {
             leaveBusy();
@@ -193,7 +193,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
 
         try {
-            return idxProc.rebuildIndexes(space, typeName);
+            return qryProc.rebuildIndexes(space, typeName);
         }
         finally {
             leaveBusy();
@@ -210,7 +210,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
 
         try {
-            return idxProc.rebuildAllIndexes();
+            return qryProc.rebuildAllIndexes();
         }
         finally {
             leaveBusy();
@@ -262,7 +262,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onSwap(space, key);
+            qryProc.onSwap(space, key);
         }
         finally {
             leaveBusy();
@@ -282,7 +282,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onUnswap(space, key, val, valBytes);
+            qryProc.onUnswap(space, key, val, valBytes);
         }
         finally {
             leaveBusy();
@@ -324,7 +324,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (val == null)
                 val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
 
-            idxProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
+            qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
         }
         finally {
             invalidateResultCache();
@@ -349,7 +349,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.remove(space, key);
+            qryProc.remove(space, key);
         }
         finally {
             invalidateResultCache();
@@ -368,7 +368,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onUndeploy(space, ldr);
+            qryProc.onUndeploy(space, ldr);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -488,7 +488,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = idxProc.query(space, qry.clause(), F.asList(args),
+                    iter = qryProc.query(space, qry.clause(), F.asList(args),
                         qry.queryClassName(), filter(qry));
 
                     break;
@@ -531,7 +531,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = idxProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
+                    iter = qryProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
 
                     break;
 
@@ -650,7 +650,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             else {
                 assert qry.type() == SQL_FIELDS;
 
-                GridQueryFieldsResult qryRes = idxProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
+                GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
 
                 res.metaData(qryRes.metaData());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
new file mode 100644
index 0000000..025ea29
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
@@ -0,0 +1,91 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Query.
+ */
+public class GridCacheSqlQuery implements Externalizable {
+    /** */
+    private static final Object[] EMPTY_PARAMS = {};
+
+    /** */
+    String alias;
+
+    /** */
+    String qry;
+
+    /** */
+    Object[] params;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridCacheSqlQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param alias Alias.
+     * @param qry Query.
+     * @param params Query parameters.
+     */
+    GridCacheSqlQuery(String alias, String qry, Object[] params) {
+        A.ensure(!F.isEmpty(qry), "qry must not be empty");
+
+        this.alias = alias;
+        this.qry = qry;
+
+        this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
+    }
+
+    /**
+     * @return Alias.
+     */
+    public String alias() {
+        return alias;
+    }
+
+    /**
+     * @return Query.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Parameters.
+     */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, alias);
+        U.writeString(out, qry);
+        U.writeArray(out, params);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        alias = U.readString(in);
+        qry = U.readString(in);
+        params = U.readArray(in);
+
+        if (F.isEmpty(params))
+            params = EMPTY_PARAMS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
new file mode 100644
index 0000000..ecee21e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
@@ -0,0 +1,19 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import java.util.*;
+
+/**
+ * SQL Query result.
+ */
+public interface GridCacheSqlResult extends AutoCloseable, Iterable<List<?>> {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
new file mode 100644
index 0000000..a7c9a02
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -0,0 +1,66 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Two step map-reduce style query.
+ */
+public class GridCacheTwoStepQuery implements Serializable {
+    /** */
+    private Map<String, GridCacheSqlQuery> mapQrys;
+
+    /** */
+    private GridCacheSqlQuery reduce;
+
+    /**
+     * @param qry Reduce query.
+     * @param params Reduce query parameters.
+     */
+    public GridCacheTwoStepQuery(String qry, Object ... params) {
+        reduce = new GridCacheSqlQuery(null, qry, params);
+    }
+
+    /**
+     * @param alias Alias.
+     * @param qry SQL Query.
+     * @param params Query parameters.
+     */
+    public void addMapQuery(String alias, String qry, Object ... params) {
+        A.ensure(!F.isEmpty(alias), "alias must not be empty");
+
+        if (mapQrys == null)
+            mapQrys = new GridLeanMap<>();
+
+        if (mapQrys.put(alias, new GridCacheSqlQuery(alias, qry, params)) != null)
+            throw new IgniteException("Failed to add query, alias already exists: " + alias + ".");
+    }
+
+    /**
+     * @return Reduce query.
+     */
+    public GridCacheSqlQuery reduceQuery() {
+        return reduce;
+    }
+
+    /**
+     * @return Map queries.
+     */
+    public Collection<GridCacheSqlQuery> mapQueries() {
+        return mapQrys.values();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
index 18b2832..1b9ec6a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
@@ -13,6 +13,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.indexing.*;
 import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -37,6 +38,15 @@ public interface GridQueryIndexing {
      */
     public void stop() throws IgniteCheckedException;
 
+
+    /**
+     * Runs two step query.
+     *
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry);
+
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
index 6bc0235..e05c425 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.future.*;
 import org.gridgain.grid.util.lang.*;
@@ -428,6 +429,22 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+        try {
+            return idx.queryTwoStep(qry);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * @param space Space.
      * @param key Key.
      * @throws IgniteCheckedException Thrown in case of any errors.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java b/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
deleted file mode 100644
index 1f6bbe4..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Queue which supports addition at tail and removing at head. This
- * queue also exposes its internal linked list nodes and allows for
- * constant time removal from the middle of the queue.
- * <p>
- * This queue is not thread-safe.
- */
-public class GridQueue<E> extends AbstractCollection<E> implements Queue<E> {
-    /** Queue size. */
-    private int size;
-
-    /** Modification count. */
-    private int modCnt;
-
-    /** Queue header. */
-    private Node<E> hdr = new Node<>(null, null, null);
-
-    /**
-     * Creates empty queue.
-     */
-    public GridQueue() {
-        hdr.next = hdr.prev = hdr;
-    }
-
-    /**
-     * Handles modification count check.
-     *
-     * @param match Modification count to match.
-     */
-    private void checkModCount(int match) {
-        if (modCnt != match)
-            throw new ConcurrentModificationException("Mod count mismatch [expected=" + match +
-                ", actual=" + modCnt + ']');
-
-        modCnt++;
-    }
-
-    /**
-     * Adds element before node.
-     *
-     * @param e Element to add.
-     * @param n Node.
-     * @return New node.
-     */
-    private Node<E> addBefore(E e, Node<E> n) {
-        A.notNull(e, "e");
-
-        assert n != null;
-
-        int match = modCnt;
-
-        Node<E> newNode = new Node<>(e, n, n.prev);
-
-        // Link.
-        newNode.prev.next = newNode;
-        newNode.next.prev = newNode;
-
-        size++;
-
-        checkModCount(match);
-
-        return newNode;
-    }
-
-    /**
-     * Removes node.
-     *
-     * @param n Node to remove.
-     * @return Removed value.
-     */
-    private E remove(Node<E> n) {
-        assert n != null;
-
-        if (n == hdr)
-            throw new NoSuchElementException();
-
-        assert !n.unlinked();
-
-        int match = modCnt;
-
-        E res = n.item;
-
-        // Relink.
-        n.prev.next = n.next;
-        n.next.prev = n.prev;
-
-        // GC.
-        n.next = n.prev = null;
-        n.item = null;
-
-        size--;
-
-        checkModCount(match);
-
-        n.unlink();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(E e) {
-        offer(e);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean remove(Object o) {
-        A.notNull(o, "o");
-
-        for (Node<E> n = hdr.next; n != hdr; n = n.next) {
-            if (o.equals(n.item)) {
-                remove(n);
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean offer(E e) {
-        addBefore(e, hdr);
-
-        return true;
-    }
-
-    /**
-     * Same as {@link #offer(Object)}, but returns created node.
-     *
-     * @param e Element to add.
-     * @return New node.
-     */
-    public Node<E> offerx(E e) {
-        return addBefore(e, hdr);
-    }
-
-    /**
-     * Polls element from head of the queue.
-     *
-     * @return Polled element.
-     */
-    @Nullable @Override public E poll() {
-        if (size == 0)
-            return null;
-
-        return remove(hdr.next);
-    }
-
-    /** {@inheritDoc} */
-    @Override public E element() {
-        Node<E> n = hdr.next;
-
-        if (n == null)
-            throw new NoSuchElementException();
-
-        return n.item;
-    }
-
-    /** {@inheritDoc} */
-    @Override public E remove() {
-        E item = poll();
-
-        if (item == null)
-            throw new NoSuchElementException();
-
-        return item;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public E peek() {
-        return hdr.next.item;
-    }
-
-    /**
-     * @return Peeks at first node in the queue.
-     */
-    public Node<E> peekx() {
-        return hdr.next == hdr ? null : hdr.next;
-    }
-
-    /**
-     * Unlinks node from the queue.
-     *
-     * @param n Node to unlink.
-     */
-    public void unlink(Node<E> n) {
-        A.notNull(n, "n");
-
-        remove(n);
-    }
-
-    /**
-     * Gets queue size.
-     *
-     * @return Queue size.
-     */
-    @Override public int size() {
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<E> iterator() {
-        return new QueueIterator();
-    }
-
-    /**
-     * Node for internal linked list.
-     *
-     * @param <E> Queue element.
-     */
-    @SuppressWarnings( {"PublicInnerClass"})
-    public static class Node<E> {
-        /** Item. */
-        private E item;
-
-        /** Next. */
-        @GridToStringExclude
-        private Node<E> next;
-
-        /** Previous. */
-        @GridToStringExclude
-        private Node<E> prev;
-
-        /** Unlinked flag. */
-        private boolean unlinked;
-
-        /**
-         * @param item Item.
-         * @param next Next link.
-         * @param prev Previous link.
-         */
-        private Node(E item, Node<E> next, Node<E> prev) {
-            this.item = item;
-            this.next = next;
-            this.prev = prev;
-        }
-
-        /**
-         * Gets this node's item.
-         *
-         * @return This node's item.
-         */
-        public E item() {
-            return item;
-        }
-
-        /**
-         * Sets unlinked flag.
-         */
-        void unlink() {
-            assert !unlinked;
-
-            unlinked = true;
-        }
-
-        /**
-         * Checks if node is unlinked.
-         *
-         * @return {@code True} if node is unlinked.
-         */
-        public boolean unlinked() {
-            return unlinked;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Node.class, this);
-        }
-    }
-
-    /**
-     * Iterator.
-     */
-    private class QueueIterator implements Iterator<E> {
-        /** Next element. */
-        private Node<E> next;
-
-        /** Expected modification count. */
-        private int expModCnt = modCnt;
-
-        /**
-         *
-         */
-        QueueIterator() {
-            next = hdr.next;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != hdr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public E next() {
-            checkModCount();
-
-            if (next == null)
-                throw new NoSuchElementException();
-
-            E ret = next.item;
-
-            next = next.next;
-
-            return ret;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         * Checks modification count.
-         */
-        private void checkModCount() {
-            if (modCnt != expModCnt)
-                throw new ConcurrentModificationException("Mod count mismatch [expected=" + expModCnt +
-                    ", actual=" + modCnt + ']');
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
deleted file mode 100644
index 4b613f6..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.testframework.junits.common.*;
-
-/**
- * Grid utils tests.
- */
-@GridCommonTest(group = "Utils")
-public class GridQueueSelfTest extends GridCommonAbstractTest {
-    /**
-     *
-     */
-    public void testQueue() {
-        GridQueue<String> q = new GridQueue<>();
-        for (char c = 'a'; c <= 'z'; c++)
-            q.offer(Character.toString(c));
-
-        assertEquals('z' - 'a' + 1, q.size());
-
-        char ch = 'a';
-
-        for (String c = q.poll(); c != null; c = q.poll()) {
-            X.println(c);
-
-            assertEquals(Character.toString(ch++), c);
-        }
-
-        assert q.isEmpty();
-
-        for (char c = 'A'; c <= 'Z'; c++)
-            q.offer(Character.toString(c));
-
-        assertEquals('Z' - 'A' + 1, q.size());
-
-        ch = 'A';
-
-        for (String s : q) {
-            X.println(s);
-
-            assertEquals(Character.toString(ch++), s);
-        }
-
-        q.remove("O");
-
-        assertEquals('Z' - 'A', q.size());
-
-        for (String c = q.poll(); c != null; c = q.poll())
-            assert !"O".equals(c);
-
-        assert q.isEmpty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
index 7307676..7ee84f8 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
@@ -16,13 +16,14 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.kernal.processors.query.*;
 import org.gridgain.grid.kernal.processors.query.h2.opt.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.offheap.unsafe.*;
@@ -112,9 +113,6 @@ public class GridH2Indexing implements GridQueryIndexing {
     }
 
     /** */
-    private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>();
-
-    /** */
     private volatile String cachedSearchPathCmd;
 
     /** Cache for deserialized offheap rows. */
@@ -148,6 +146,12 @@ public class GridH2Indexing implements GridQueryIndexing {
     private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>());
 
     /** */
+    private GridMapQueryExecutor mapQryExec;
+
+    /** */
+    private GridReduceQueryExecutor rdcQryExec;
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -218,6 +222,15 @@ public class GridH2Indexing implements GridQueryIndexing {
     private volatile GridKernalContext ctx;
 
     /**
+     * @param space Space.
+     * @return Connection.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Connection connectionForSpace(@Nullable String space) throws IgniteCheckedException {
+        return connectionForThread(schema(space));
+    }
+
+    /**
      * Gets DB connection.
      *
      * @param schema Whether to set schema for connection or not.
@@ -370,22 +383,15 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (tbl == null)
             return; // Type was rejected.
 
-        localSpi.set(this);
+        removeKey(spaceName, k, tbl);
 
-        try {
-            removeKey(spaceName, k, tbl);
-
-            if (expirationTime == 0)
-                expirationTime = Long.MAX_VALUE;
+        if (expirationTime == 0)
+            expirationTime = Long.MAX_VALUE;
 
-            tbl.tbl.update(k, v, expirationTime);
+        tbl.tbl.update(k, v, expirationTime);
 
-            if (tbl.luceneIdx != null)
-                tbl.luceneIdx.store(k, v, ver, expirationTime);
-        }
-        finally {
-            localSpi.remove();
-        }
+        if (tbl.luceneIdx != null)
+            tbl.luceneIdx.store(k, v, ver, expirationTime);
     }
 
     /** {@inheritDoc} */
@@ -393,23 +399,16 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
 
-        localSpi.set(this);
+        for (TableDescriptor tbl : tables(schema(spaceName))) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                if (tbl.tbl.update(key, null, 0)) {
+                    if (tbl.luceneIdx != null)
+                        tbl.luceneIdx.remove(key);
 
-        try {
-            for (TableDescriptor tbl : tables(schema(spaceName))) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    if (tbl.tbl.update(key, null, 0)) {
-                        if (tbl.luceneIdx != null)
-                            tbl.luceneIdx.remove(key);
-
-                        return;
-                    }
+                    return;
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /** {@inheritDoc} */
@@ -419,47 +418,33 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (schema == null)
             return;
 
-        localSpi.set(this);
-
-        try {
-            for (TableDescriptor tbl : schema.values()) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    try {
-                        if (tbl.tbl.onSwap(key))
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteCheckedException(e);
-                    }
+        for (TableDescriptor tbl : schema.values()) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                try {
+                    if (tbl.tbl.onSwap(key))
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteCheckedException(e);
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /** {@inheritDoc} */
     @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
         throws IgniteCheckedException {
-        localSpi.set(this);
-
-        try {
-            for (TableDescriptor tbl : tables(schema(spaceName))) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    try {
-                        if (tbl.tbl.onUnswap(key, val))
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteCheckedException(e);
-                    }
+        for (TableDescriptor tbl : tables(schema(spaceName))) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                try {
+                    if (tbl.tbl.onUnswap(key, val))
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteCheckedException(e);
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /**
@@ -540,8 +525,6 @@ public class GridH2Indexing implements GridQueryIndexing {
     @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry,
         @Nullable final Collection<Object> params, final GridIndexingQueryFilter filters)
         throws IgniteCheckedException {
-        localSpi.set(this);
-
         setFilters(filters);
 
         try {
@@ -575,8 +558,6 @@ public class GridH2Indexing implements GridQueryIndexing {
         }
         finally {
             setFilters(null);
-
-            localSpi.remove();
         }
     }
 
@@ -652,7 +633,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
+    public ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
         @Nullable Collection<Object> params) throws IgniteCheckedException {
         long start = U.currentTimeMillis();
 
@@ -719,7 +700,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      * @param params Parameters collection.
      * @throws IgniteCheckedException If failed.
      */
-    private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
+    public void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
         if (!F.isEmpty(params)) {
             int idx = 1;
 
@@ -751,8 +732,6 @@ public class GridH2Indexing implements GridQueryIndexing {
 
         setFilters(filters);
 
-        localSpi.set(this);
-
         try {
             ResultSet rs = executeQuery(qry, params, tbl);
 
@@ -760,11 +739,14 @@ public class GridH2Indexing implements GridQueryIndexing {
         }
         finally {
             setFilters(null);
-
-            localSpi.remove();
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+        return rdcQryExec.query(qry);
+    }
+
     /**
      * Sets filters for current thread. Must be set to not null value
      * before executeQuery and reset to null after in finally block since it signals
@@ -772,7 +754,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      *
      * @param filters Filters.
      */
-    private void setFilters(@Nullable GridIndexingQueryFilter filters) {
+    public void setFilters(@Nullable GridIndexingQueryFilter filters) {
         GridH2IndexBase.setFiltersForThread(filters);
     }
 
@@ -1115,6 +1097,12 @@ public class GridH2Indexing implements GridQueryIndexing {
 
             for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration())
                 registerSpace(cacheCfg.getName());
+
+            mapQryExec = new GridMapQueryExecutor();
+            rdcQryExec = new GridReduceQueryExecutor();
+
+            mapQryExec.start(ctx, this);
+            rdcQryExec.start(ctx, this);
         }
 
         System.setProperty("h2.serializeJavaObject", "false");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
index 61f1190..90aa454 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
@@ -10,6 +10,7 @@
 package org.gridgain.grid.kernal.processors.query.h2;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
@@ -20,7 +21,7 @@ import java.util.*;
 /**
  * Iterator over result set.
  */
-abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
+public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
index 00cb06d..4dcfd73 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
@@ -49,7 +49,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param fs Filters.
      */
     public static void setFiltersForThread(GridIndexingQueryFilter fs) {
-        filters.set(fs);
+        if (fs == null)
+            filters.remove();
+        else
+            filters.set(fs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
new file mode 100644
index 0000000..b86caf1
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -0,0 +1,263 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.*;
+import org.h2.jdbc.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.lang.reflect.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Map query executor.
+ */
+public class GridMapQueryExecutor {
+    /** */
+    private static final Field RESULT_FIELD;
+
+    /**
+     * Initialize.
+     */
+    static {
+        try {
+            RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
+
+            RESULT_FIELD.setAccessible(true);
+        }
+        catch (NoSuchFieldException e) {
+            throw new IllegalStateException("Check H2 version in classpath.", e);
+        }
+    }
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private GridKernalContext ctx;
+
+    /** */
+    private GridH2Indexing h2;
+
+    /** */
+    private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Context.
+     * @param h2 H2 Indexing.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.h2 = h2;
+
+        log = ctx.log(GridMapQueryExecutor.class);
+
+        // TODO handle node failures.
+
+        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assert msg != null;
+
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (msg instanceof GridQueryRequest)
+                    executeLocalQuery(node, (GridQueryRequest)msg);
+                else if (msg instanceof GridNextPageRequest)
+                    sendNextPage(node, (GridNextPageRequest)msg);
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param node Node.
+     * @param req Query request.
+     */
+    private void executeLocalQuery(ClusterNode node, GridQueryRequest req) {
+        h2.setFilters(new GridIndexingQueryFilter() {
+            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
+                final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
+
+                if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+                    return null;
+
+                return new IgniteBiPredicate<K, V>() {
+                    @Override public boolean apply(K k, V v) {
+                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
+                    }
+                };
+            }
+        });
+
+        try {
+            QueryResults qr = new QueryResults(req.requestId(), req.queries().size());
+
+            ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+            if (nodeRess == null) {
+                nodeRess = new ConcurrentHashMap8<>();
+
+                ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess);
+
+                if (old != null)
+                    nodeRess = old;
+            }
+
+            QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr);
+
+            assert old == null;
+
+            // Prepare snapshots for all the needed tables before actual run.
+            for (GridCacheSqlQuery qry : req.queries()) {
+                // TODO
+            }
+
+            // Run queries.
+            int i = 0;
+
+            for (GridCacheSqlQuery qry : req.queries()) {
+                ResultSet rs = h2.executeSqlQueryWithTimer(h2.connectionForSpace(null), qry.query(),
+                    F.asList(qry.parameters()));
+
+                assert rs instanceof JdbcResultSet : rs.getClass();
+
+                ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs);
+
+                qr.results[i] = res;
+                qr.resultSets[i] = rs;
+
+                // Send the first page.
+                sendNextPage(node, qr, i, req.pageSize(), res.getRowCount());
+
+                i++;
+            }
+        }
+        catch (Throwable e) {
+            sendError(node, req.requestId(), e);
+        }
+        finally {
+            h2.setFilters(null);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param qryReqId Query request ID.
+     * @param err Error.
+     */
+    private void sendError(ClusterNode node, long qryReqId, Throwable err) {
+        try {
+            ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err));
+        }
+        catch (IgniteCheckedException e) {
+            e.addSuppressed(err);
+
+            log.error("Failed to send error message.", e);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param req Request.
+     */
+    private void sendNextPage(ClusterNode node, GridNextPageRequest req) {
+        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+        QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
+
+        if (qr == null)
+            sendError(node, req.queryRequestId(),
+                new IllegalStateException("No query result found for request: " + req));
+        else
+            sendNextPage(node, qr, req.query(), req.pageSize(), -1);
+    }
+
+    /**
+     * @param node Node.
+     * @param qr Query results.
+     * @param qry Query.
+     * @param pageSize Page size.
+     * @param allRows All rows count.
+     */
+    private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) {
+        int page;
+
+        List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
+
+        ResultInterface res = qr.results[qry];
+
+        assert res != null;
+
+        synchronized (res) {
+            page = qr.pages[qry]++;
+
+            for (int i = 0 ; i < pageSize; i++) {
+                if (!res.next())
+                    break;
+
+                rows.add(res.currentRow());
+            }
+        }
+
+        try {
+            ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send message.", e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class QueryResults {
+        /** */
+        private long qryReqId;
+
+        /** */
+        private ResultInterface[] results;
+
+        /** */
+        private ResultSet[] resultSets;
+
+        /** */
+        private int[] pages;
+
+        /**
+         * @param qryReqId Query request ID.
+         * @param qrys Queries.
+         */
+        private QueryResults(long qryReqId, int qrys) {
+            this.qryReqId = qryReqId;
+
+            results = new ResultInterface[qrys];
+            resultSets = new ResultSet[qrys];
+            pages = new int[qrys];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 163256b..16ba15d 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -9,7 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
-import org.gridgain.grid.*;
+import org.apache.ignite.*;
 import org.h2.engine.*;
 import org.h2.index.*;
 import org.h2.message.*;
@@ -18,31 +18,23 @@ import org.h2.table.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
  * Merge index.
  */
-public class GridMergeIndex extends BaseIndex {
+public abstract class GridMergeIndex extends BaseIndex {
     /** */
-    private static final int MAX_CAPACITY = 100_000;
-
-    /** */
-    private static final Collection<Row> END = new ArrayList<>(0);
-
-    /** */
-    private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>();
-
-    /** */
-    private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>();
-
-    /** */
-    private int fetchedCnt;
+    private static final int MAX_FETCH_SIZE = 100000;
 
     /** */
     private final AtomicInteger cnt = new AtomicInteger(0);
 
+    /**
+     * Will be r/w from query execution thread only, does not need to be threadsafe.
+     */
+    private ArrayList<Row> fetched = new ArrayList<>();
+
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
         return cnt.get();
@@ -61,12 +53,36 @@ public class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param rows0 Rows.
+     * @param page Page.
      */
-    public void addRows(Collection<Row> rows0) {
-        assert !rows0.isEmpty();
+    public abstract void addPage(GridResultPage<?> page);
+
+    /** {@inheritDoc} */
+    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+        if (fetched == null)
+            throw new IgniteException("Fetched result set was too large.");
 
+        if (fetched.size() == cnt.get())  // We've fetched all the rows.
+            return findAllFetched(fetched, first, last);
 
+        return findInStream(first, last);
+    }
+
+    /**
+     * @param first First row.
+     * @param last Last row.
+     * @return Cursor. Usually it must be {@link FetchingCursor} instance.
+     */
+    protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
+
+    /**
+     * @param fetched Fetched rows.
+     * @param first First row.
+     * @param last Last row.
+     * @return Cursor.
+     */
+    protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+        return new IteratorCursor(fetched.iterator());
     }
 
     /** {@inheritDoc} */
@@ -90,14 +106,6 @@ public class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
-        if (fetchedRows == null)
-            throw new GridRuntimeException("Rows were dropped out of result set.");
-
-        return new Cursor0();
-    }
-
-    /** {@inheritDoc} */
     @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
         return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
     }
@@ -132,12 +140,24 @@ public class GridMergeIndex extends BaseIndex {
         return 0;
     }
 
-    private class Cursor0 implements Cursor {
+    /**
+     * Cursor over iterator.
+     */
+    protected class IteratorCursor implements Cursor {
         /** */
-        private Row cur;
+        private Iterator<Row> iter;
 
         /** */
-        private Iterator<Row> curIter;
+        protected Row cur;
+
+        /**
+         * @param iter Iterator.
+         */
+        public IteratorCursor(Iterator<Row> iter) {
+            assert iter != null;
+
+            this.iter = iter;
+        }
 
         /** {@inheritDoc} */
         @Override public Row get() {
@@ -151,7 +171,9 @@ public class GridMergeIndex extends BaseIndex {
 
         /** {@inheritDoc} */
         @Override public boolean next() {
-            return false;
+            cur = iter.hasNext() ? iter.next() : null;
+
+            return cur != null;
         }
 
         /** {@inheritDoc} */
@@ -159,4 +181,51 @@ public class GridMergeIndex extends BaseIndex {
             throw DbException.getUnsupportedException("previous");
         }
     }
+
+    /**
+     * Fetching cursor.
+     */
+    protected abstract class FetchingCursor extends IteratorCursor {
+        /** */
+        private boolean canFetch = true;
+
+        /**
+         */
+        public FetchingCursor() {
+            super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
+        }
+
+        /**
+         * @return Next row or {@code null} if none available.
+         */
+        @Nullable protected abstract Row fetchNext();
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (super.next())
+                return true;
+
+            if (!canFetch)
+                return false;
+
+            cur = fetchNext();
+
+            if (cur == null) { // No more results to fetch.
+                assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+
+                canFetch = false;
+
+                return false;
+            }
+
+            if (fetched != null) { // Try to reuse fetched result.
+                fetched.add(cur);
+
+                if (fetched.size() == MAX_FETCH_SIZE)
+                    fetched = null; // Throw away fetched result if it is too large.
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
new file mode 100644
index 0000000..c6aaea9
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -0,0 +1,74 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.h2.index.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Unsorted merge index.
+ */
+public class GridMergeIndexUnsorted extends GridMergeIndex {
+    /** */
+    private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
+
+    /** {@inheritDoc} */
+    @Override public void addPage(GridResultPage<?> page) {
+        queue.add(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
+        final GridResultPage<?> p = queue.poll();
+
+        assert p != null; // First page must be already fetched.
+
+        if (p.isEmpty())
+            return new IteratorCursor(Collections.<Row>emptyIterator());
+
+        p.fetchNextPage(); // We always request next page before reading this one.
+
+        return new FetchingCursor() {
+            /** */
+            Iterator<Value[]> iter = p.rows().iterator();
+
+            @Nullable @Override protected Row fetchNext() {
+                if (!iter.hasNext()) {
+                    GridResultPage<?> page;
+
+                    try {
+                        page = queue.take();
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteException("Query execution was interrupted.", e);
+                    }
+
+                    if (page.isEmpty()) {
+                        assert queue.isEmpty() : "It must be the last page.";
+
+                        return null; // Empty page - we are done.
+                    }
+
+                    page.fetchNextPage();
+
+                    iter = page.rows().iterator();
+                }
+
+                return new Row(iter.next(), 0);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index 3f059fa..a1f2213 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
+import org.h2.api.*;
 import org.h2.command.ddl.*;
 import org.h2.engine.*;
 import org.h2.index.*;
@@ -26,7 +27,7 @@ public class GridMergeTable extends TableBase {
     private final ArrayList<Index> idxs = new ArrayList<>(1);
 
     /** */
-    private final GridMergeIndex idx = new GridMergeIndex();
+    private final GridMergeIndex idx = new GridMergeIndexUnsorted();
 
     /**
      * @param data Data.
@@ -142,4 +143,34 @@ public class GridMergeTable extends TableBase {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
+
+    /**
+     * Engine.
+     */
+    public static class Engine implements TableEngine {
+        /** */
+        private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>();
+
+        /**
+         * @return Created table.
+         */
+        public static GridMergeTable getCreated() {
+            GridMergeTable tbl = createdTbl.get();
+
+            assert tbl != null;
+
+            createdTbl.remove();
+
+            return tbl;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Table createTable(CreateTableData data) {
+            GridMergeTable tbl = new GridMergeTable(data);
+
+            createdTbl.set(tbl);
+
+            return tbl;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
deleted file mode 100644
index d550b3b..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * Request to fetch next page.
- */
-public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private long qryId;
-
-    /** */
-    private int qry;
-
-    /** */
-    private int offset;
-
-    /** */
-    private int pageSize;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
deleted file mode 100644
index d77215c..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-import org.h2.value.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private Collection<Value[]> rows;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
deleted file mode 100644
index 10e30ee..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryAck extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
deleted file mode 100644
index 7a664fd..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryRequest extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private List<String> sqlQrys;
-
-    /** */
-    private List<Collection<Object>> params;
-
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
new file mode 100644
index 0000000..3e7e12c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -0,0 +1,199 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Reduce query executor.
+ */
+public class GridReduceQueryExecutor {
+    /** */
+    private GridKernalContext ctx;
+
+    /** */
+    private GridH2Indexing h2;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private final AtomicLong reqIdGen = new AtomicLong();
+
+    /** */
+    private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Context.
+     * @param h2 H2 Indexing.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.h2 = h2;
+
+        log = ctx.log(GridReduceQueryExecutor.class);
+
+        // TODO handle node failure.
+
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg != null;
+
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (msg instanceof GridNextPageResponse)
+                    onNextPage(node, (GridNextPageResponse)msg);
+                else if (msg instanceof GridQueryFailResponse)
+                    onFail(node, (GridQueryFailResponse)msg);
+            }
+        });
+    }
+
+    private void onFail(ClusterNode node, GridQueryFailResponse msg) {
+        U.error(log, "Failed to execute query.", msg.error());
+    }
+
+    private void onNextPage(final ClusterNode node, GridNextPageResponse msg) {
+        final long qryReqId = msg.queryRequestId();
+        final int qry = msg.query();
+        final int pageSize = msg.rows().size();
+
+        QueryRun r = runs.get(qryReqId);
+
+        GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
+
+        idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+            @Override public void fetchNextPage() {
+                try {
+                    ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+        if (msg.allRows() != -1) { // Only the first page contains row count.
+            idx.addCount(msg.allRows());
+
+            r.latch.countDown();
+        }
+    }
+
+    public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
+        long qryReqId = reqIdGen.incrementAndGet();
+
+        QueryRun r = new QueryRun();
+
+        r.tbls = new ArrayList<>();
+
+        try {
+            r.conn = h2.connectionForSpace(null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+
+        for (GridCacheSqlQuery mapQry : qry.mapQueries())
+            r.tbls.add(createTable(r.conn, mapQry));
+
+        Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
+
+        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+
+        this.runs.put(qryReqId, r);
+
+        try {
+            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+
+            r.latch.await();
+
+            GridCacheSqlQuery rdc = qry.reduceQuery();
+
+            final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters()));
+
+            return new GridFinishedFuture(ctx, new Iter(res));
+        }
+        catch (IgniteCheckedException | InterruptedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+    }
+
+    private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) {
+        try {
+            try (PreparedStatement s = conn.prepareStatement(
+                "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
+                " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " +
+                " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) {
+                h2.bindParameters(s, F.asList(qry.parameters()));
+
+                s.execute();
+            }
+
+            return GridMergeTable.Engine.getCreated();
+        }
+        catch (SQLException|IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class QueryRun {
+        /** */
+        private List<GridMergeTable> tbls;
+
+        /** */
+        private CountDownLatch latch;
+
+        /** */
+        private Connection conn;
+    }
+
+    /**
+     *
+     */
+    private static class Iter extends GridH2ResultSetIterator<List<?>> implements GridCacheSqlResult {
+        /**
+         * @param data Data array.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected Iter(ResultSet data) throws IgniteCheckedException {
+            super(data);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected List<?> createRow() {
+            ArrayList<Object> res = new ArrayList<>(row.length);
+
+            Collections.addAll(res, row);
+
+            return res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
new file mode 100644
index 0000000..6c6ab6a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -0,0 +1,76 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.h2.result.*;
+import org.h2.value.*;
+
+import java.util.*;
+
+/**
+ * Page result.
+ */
+public abstract class GridResultPage<S> {
+    /** */
+    private final S src;
+
+    /** */
+    private final Collection<Value[]> rows;
+
+    /** */
+    private final int page;
+
+    /**
+     * @param src Source.
+     * @param page Page.
+     * @param rows Page rows.
+     */
+    protected GridResultPage(S src, int page, Collection<Value[]> rows) {
+        assert src != null;
+        assert rows != null;
+
+        this.src = src;
+        this.page = page;
+        this.rows = rows;
+    }
+
+    /**
+     * @return Result source.
+     */
+    public S source() {
+        return src;
+    }
+
+    /**
+     * @return Page.
+     */
+    public int page() {
+        return page;
+    }
+
+    /**
+     * @return {@code true} If result is empty.
+     */
+    public boolean isEmpty() {
+        return rows.isEmpty();
+    }
+
+    /**
+     * @return Page rows.
+     */
+    public Collection<Value[]> rows() {
+        return rows;
+    }
+
+    /**
+     * Request next page.
+     */
+    public abstract void fetchNextPage();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
new file mode 100644
index 0000000..e1eb905
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
@@ -0,0 +1,59 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+
+import java.io.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridNextPageRequest implements Serializable {
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int pageSize;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param pageSize Page size.
+     */
+    public GridNextPageRequest(long qryReqId, int qry, int pageSize) {
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
new file mode 100644
index 0000000..de38172
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -0,0 +1,149 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import org.h2.store.*;
+import org.h2.value.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Next page response.
+ */
+public class GridNextPageResponse implements Externalizable {
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int page;
+
+    /** */
+    private int allRows;
+
+    /** */
+    private Collection<Value[]> rows;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param page Page.
+     * @param allRows All rows count.
+     * @param rows Rows.
+     */
+    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+        assert rows != null;
+
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.page = page;
+        this.allRows = allRows;
+        this.rows = rows;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page.
+     */
+    public int page() {
+        return page;
+    }
+
+    /**
+     * @return All rows.
+     */
+    public int allRows() {
+        return allRows;
+    }
+
+    /**
+     * @return Rows.
+     */
+    public Collection<Value[]> rows() {
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(qryReqId);
+        out.writeInt(qry);
+        out.writeInt(page);
+        out.writeInt(allRows);
+
+        out.writeInt(rows.size());
+
+        if (rows.isEmpty())
+            return;
+
+        Data data = Data.create(null, 512);
+
+        boolean first = true;
+
+        for (Value[] row : rows) {
+            if (first) {
+                out.writeInt(row.length);
+
+                first = false;
+            }
+
+            for (Value val : row)
+                data.writeValue(val);
+        }
+
+        out.writeInt(data.length());
+        out.write(data.getBytes(), 0, data.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        qryReqId = in.readLong();
+        qry = in.readInt();
+        page = in.readInt();
+        allRows = in.readInt();
+
+        int rowCnt = in.readInt();
+
+        if (rowCnt == 0)
+            rows = Collections.emptyList();
+        else {
+            rows = new ArrayList<>(rowCnt);
+
+            int cols = in.readInt();
+            int dataSize = in.readInt();
+
+            Data data = Data.create(null, dataSize);
+
+            for (int r = 0; r < rowCnt; r++) {
+                Value[] row = new Value[cols];
+
+                for (int c = 0; c < cols; c++)
+                    row[c] = data.readValue();
+
+                rows.add(row);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
new file mode 100644
index 0000000..fe55114
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
@@ -0,0 +1,34 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import java.io.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryAck implements Serializable {
+    /** */
+    private long reqId;
+
+    /**
+     * @param reqId Request ID.
+     */
+    public GridQueryAck(long reqId) {
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+}


[6/8] incubator-ignite git commit: ignite-gg9499 - fixes

Posted by se...@apache.org.
ignite-gg9499 - fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1226b240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1226b240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1226b240

Branch: refs/heads/ignite-gg9499
Commit: 1226b2401b58e0e75d80e01787c0cee39ce5aeee
Parents: 29823e6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:09:04 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:09:04 2014 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  |  11 +-
 .../query/h2/twostep/GridMergeIndex.java        | 116 ++++++++++++++-----
 .../h2/twostep/GridMergeIndexUnsorted.java      |  66 ++++++-----
 .../query/h2/twostep/GridMergeTable.java        |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  34 +++---
 .../query/h2/twostep/GridResultPage.java        |  53 +++------
 .../twostep/messages/GridNextPageResponse.java  |  29 ++++-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  34 +++++-
 8 files changed, 240 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b86caf1..92d1875 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -211,19 +211,26 @@ public class GridMapQueryExecutor {
 
         assert res != null;
 
+        boolean last = false;
+
         synchronized (res) {
             page = qr.pages[qry]++;
 
             for (int i = 0 ; i < pageSize; i++) {
-                if (!res.next())
+                if (!res.next()) {
+                    last = true;
+
                     break;
+                }
 
                 rows.add(res.currentRow());
             }
         }
 
         try {
-            ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+            ctx.io().sendUserMessage(F.asList(node),
+                new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows),
+                GridTopic.TOPIC_QUERY, false, 0);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 16ba15d..5d51b32 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -10,11 +10,14 @@
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
 import org.h2.engine.*;
 import org.h2.index.*;
 import org.h2.message.*;
 import org.h2.result.*;
 import org.h2.table.*;
+import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -25,16 +28,32 @@ import java.util.concurrent.atomic.*;
  */
 public abstract class GridMergeIndex extends BaseIndex {
     /** */
+    protected final GridResultPage<?> END = new GridResultPage<Object>(null, null);
+
+    /** */
     private static final int MAX_FETCH_SIZE = 100000;
 
     /** */
     private final AtomicInteger cnt = new AtomicInteger(0);
 
+    /** Result sources. */
+    private final AtomicInteger srcs = new AtomicInteger(0);
+
     /**
      * Will be r/w from query execution thread only, does not need to be threadsafe.
      */
     private ArrayList<Row> fetched = new ArrayList<>();
 
+    /**
+     * @param tbl Table.
+     * @param name Index name.
+     * @param type Type.
+     * @param cols Columns.
+     */
+    public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) {
+        initBaseIndex(tbl, 0, name, cols, type);
+    }
+
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
         return cnt.get();
@@ -46,6 +65,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param srcs Number of sources.
+     */
+    public void setNumberOfSources(int srcs) {
+        this.srcs.set(srcs);
+    }
+
+    /**
      * @param cnt Count.
      */
     public void addCount(int cnt) {
@@ -55,7 +81,26 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param page Page.
      */
-    public abstract void addPage(GridResultPage<?> page);
+    public final void addPage(GridResultPage<?> page) {
+        if (!page.response().rows().isEmpty())
+            addPage0(page);
+        else
+            assert page.response().isLast();
+
+        if (page.response().isLast()) {
+            int srcs0 = srcs.decrementAndGet();
+
+            assert srcs0 >= 0;
+
+            if (srcs0 == 0)
+                addPage0(END); // We've fetched all.
+        }
+    }
+
+    /**
+     * @param page Page.
+     */
+    protected abstract void addPage0(GridResultPage<?> page);
 
     /** {@inheritDoc} */
     @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
@@ -145,7 +190,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      */
     protected class IteratorCursor implements Cursor {
         /** */
-        private Iterator<Row> iter;
+        protected Iterator<Row> iter;
 
         /** */
         protected Row cur;
@@ -185,47 +230,66 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * Fetching cursor.
      */
-    protected abstract class FetchingCursor extends IteratorCursor {
+    protected class FetchingCursor extends IteratorCursor {
         /** */
-        private boolean canFetch = true;
+        private Iterator<Row> stream;
 
         /**
          */
-        public FetchingCursor() {
-            super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
-        }
+        public FetchingCursor(Iterator<Row> stream) {
+            super(new FetchedIterator());
 
-        /**
-         * @return Next row or {@code null} if none available.
-         */
-        @Nullable protected abstract Row fetchNext();
+            assert stream != null;
+
+            this.stream = stream;
+        }
 
         /** {@inheritDoc} */
         @Override public boolean next() {
-            if (super.next())
+            if (super.next()) {
+                assert cur != null;
+
+                if (iter == stream && fetched != null) { // Cache fetched rows for reuse.
+                    if (fetched.size() == MAX_FETCH_SIZE)
+                        fetched = null; // Throw away fetched result if it is too large.
+                    else
+                        fetched.add(cur);
+                }
+
+                X.println("__ row: " + Arrays.toString(cur.getValueList()));
+
                 return true;
+            }
 
-            if (!canFetch)
+            if (iter == stream) // We've fetched the stream.
                 return false;
 
-            cur = fetchNext();
-
-            if (cur == null) { // No more results to fetch.
-                assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+            iter = stream; // Switch from cached to stream.
 
-                canFetch = false;
+            return next();
+        }
+    }
 
-                return false;
-            }
+    /**
+     * List iterator without {@link ConcurrentModificationException}.
+     */
+    private class FetchedIterator implements Iterator<Row> {
+        /** */
+        private int idx;
 
-            if (fetched != null) { // Try to reuse fetched result.
-                fetched.add(cur);
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return fetched != null && idx < fetched.size();
+        }
 
-                if (fetched.size() == MAX_FETCH_SIZE)
-                    fetched = null; // Throw away fetched result if it is too large.
-            }
+        /** {@inheritDoc} */
+        @Override public Row next() {
+            return fetched.get(idx++);
+        }
 
-            return true;
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index c6aaea9..b93c6ec 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -10,8 +10,10 @@
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.util.typedef.*;
 import org.h2.index.*;
 import org.h2.result.*;
+import org.h2.table.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
@@ -25,50 +27,60 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
     /** */
     private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
 
+    /**
+     * @param tbl  Table.
+     * @param name Index name.
+     */
+    public GridMergeIndexUnsorted(GridMergeTable tbl, String name) {
+        super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+    }
+
     /** {@inheritDoc} */
-    @Override public void addPage(GridResultPage<?> page) {
+    @Override public void addPage0(GridResultPage<?> page) {
         queue.add(page);
     }
 
     /** {@inheritDoc} */
     @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-        final GridResultPage<?> p = queue.poll();
-
-        assert p != null; // First page must be already fetched.
+        return new FetchingCursor(new Iterator<Row>() {
+            /** */
+            Iterator<Value[]> iter = Collections.emptyIterator();
 
-        if (p.isEmpty())
-            return new IteratorCursor(Collections.<Row>emptyIterator());
+            @Override public boolean hasNext() {
+                if (iter.hasNext())
+                    return true;
 
-        p.fetchNextPage(); // We always request next page before reading this one.
+                GridResultPage<?> page;
 
-        return new FetchingCursor() {
-            /** */
-            Iterator<Value[]> iter = p.rows().iterator();
+                try {
+                    page = queue.take();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException("Query execution was interrupted.", e);
+                }
 
-            @Nullable @Override protected Row fetchNext() {
-                if (!iter.hasNext()) {
-                    GridResultPage<?> page;
+                if (page == END) {
+                    assert queue.isEmpty() : "It must be the last page: " + queue;
 
-                    try {
-                        page = queue.take();
-                    }
-                    catch (InterruptedException e) {
-                        throw new IgniteException("Query execution was interrupted.", e);
-                    }
+                    return false; // We are done.
+                }
 
-                    if (page.isEmpty()) {
-                        assert queue.isEmpty() : "It must be the last page.";
+                page.fetchNextPage();
 
-                        return null; // Empty page - we are done.
-                    }
+                iter = page.response().rows().iterator();
 
-                    page.fetchNextPage();
+                assert iter.hasNext();
 
-                    iter = page.rows().iterator();
-                }
+                return true;
+            }
 
+            @Override public Row next() {
                 return new Row(iter.next(), 0);
             }
-        };
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index a1f2213..683bb54 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -27,7 +27,7 @@ public class GridMergeTable extends TableBase {
     private final ArrayList<Index> idxs = new ArrayList<>(1);
 
     /** */
-    private final GridMergeIndex idx = new GridMergeIndexUnsorted();
+    private final GridMergeIndex idx;
 
     /**
      * @param data Data.
@@ -35,6 +35,8 @@ public class GridMergeTable extends TableBase {
     public GridMergeTable(CreateTableData data) {
         super(data);
 
+        idx = new GridMergeIndexUnsorted(this, "merge_scan");
+
         idxs.add(idx);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3e7e12c..a12b4f9 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,8 +59,8 @@ public class GridReduceQueryExecutor {
 
         // TODO handle node failure.
 
-        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
                 assert msg != null;
 
                 ClusterNode node = ctx.discovery().node(nodeId);
@@ -69,6 +69,8 @@ public class GridReduceQueryExecutor {
                     onNextPage(node, (GridNextPageResponse)msg);
                 else if (msg instanceof GridQueryFailResponse)
                     onFail(node, (GridQueryFailResponse)msg);
+
+                return true;
             }
         });
     }
@@ -86,7 +88,13 @@ public class GridReduceQueryExecutor {
 
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
-        idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+        if (msg.allRows() != -1) { // Only the first page contains row count.
+            idx.addCount(msg.allRows());
+
+            r.latch.countDown();
+        }
+
+        idx.addPage(new GridResultPage<UUID>(node.id(), msg) {
             @Override public void fetchNextPage() {
                 try {
                     ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
@@ -96,12 +104,6 @@ public class GridReduceQueryExecutor {
                 }
             }
         });
-
-        if (msg.allRows() != -1) { // Only the first page contains row count.
-            idx.addCount(msg.allRows());
-
-            r.latch.countDown();
-        }
     }
 
     public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
@@ -118,17 +120,23 @@ public class GridReduceQueryExecutor {
             throw new IgniteException(e);
         }
 
-        for (GridCacheSqlQuery mapQry : qry.mapQueries())
-            r.tbls.add(createTable(r.conn, mapQry));
-
         Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
 
+        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+            GridMergeTable tbl = createTable(r.conn, mapQry);
+
+            tbl.getScanIndex(null).setNumberOfSources(nodes.size());
+
+            r.tbls.add(tbl);
+        }
+
         r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 
         this.runs.put(qryReqId, r);
 
         try {
-            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size
+                GridTopic.TOPIC_QUERY, false, 0);
 
             r.latch.await();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
index 6c6ab6a..ef52805 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -9,68 +9,51 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
-import org.h2.result.*;
-import org.h2.value.*;
-
-import java.util.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.internal.*;
 
 /**
  * Page result.
  */
-public abstract class GridResultPage<S> {
-    /** */
-    private final S src;
-
+public class GridResultPage<Z> {
     /** */
-    private final Collection<Value[]> rows;
+    private final Z src;
 
     /** */
-    private final int page;
+    private final GridNextPageResponse res;
 
     /**
      * @param src Source.
-     * @param page Page.
-     * @param rows Page rows.
+     * @param res Response.
      */
-    protected GridResultPage(S src, int page, Collection<Value[]> rows) {
-        assert src != null;
-        assert rows != null;
-
+    protected GridResultPage(Z src, GridNextPageResponse res) {
         this.src = src;
-        this.page = page;
-        this.rows = rows;
+        this.res = res;
     }
 
     /**
      * @return Result source.
      */
-    public S source() {
+    public Z source() {
         return src;
     }
 
     /**
-     * @return Page.
+     * @return Response.
      */
-    public int page() {
-        return page;
+    public GridNextPageResponse response() {
+        return res;
     }
 
     /**
-     * @return {@code true} If result is empty.
+     * Request next page.
      */
-    public boolean isEmpty() {
-        return rows.isEmpty();
+    public void fetchNextPage() {
+        throw new UnsupportedOperationException();
     }
 
-    /**
-     * @return Page rows.
-     */
-    public Collection<Value[]> rows() {
-        return rows;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridResultPage.class, this);
     }
-
-    /**
-     * Request next page.
-     */
-    public abstract void fetchNextPage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
index de38172..0834f4c 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
 
+import org.gridgain.grid.util.typedef.internal.*;
 import org.h2.store.*;
 import org.h2.value.*;
 
@@ -34,20 +35,32 @@ public class GridNextPageResponse implements Externalizable {
     /** */
     private Collection<Value[]> rows;
 
+    /** */
+    private boolean last;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridNextPageResponse() {
+        // No-op.
+    }
+
     /**
      * @param qryReqId Query request ID.
      * @param qry Query.
      * @param page Page.
      * @param allRows All rows count.
+     * @param last Last row.
      * @param rows Rows.
      */
-    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) {
         assert rows != null;
 
         this.qryReqId = qryReqId;
         this.qry = qry;
         this.page = page;
         this.allRows = allRows;
+        this.last = last;
         this.rows = rows;
     }
 
@@ -80,6 +93,13 @@ public class GridNextPageResponse implements Externalizable {
     }
 
     /**
+     * @return {@code true} If this is the last page.
+     */
+    public boolean isLast() {
+        return last;
+    }
+
+    /**
      * @return Rows.
      */
     public Collection<Value[]> rows() {
@@ -91,6 +111,7 @@ public class GridNextPageResponse implements Externalizable {
         out.writeLong(qryReqId);
         out.writeInt(qry);
         out.writeInt(page);
+        out.writeBoolean(last);
         out.writeInt(allRows);
 
         out.writeInt(rows.size());
@@ -122,6 +143,7 @@ public class GridNextPageResponse implements Externalizable {
         qryReqId = in.readLong();
         qry = in.readInt();
         page = in.readInt();
+        last = in.readBoolean();
         allRows = in.readInt();
 
         int rowCnt = in.readInt();
@@ -146,4 +168,9 @@ public class GridNextPageResponse implements Externalizable {
             }
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNextPageResponse.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index bc9dcf8..90a2d4c 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -18,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.junits.common.*;
 
 import java.util.*;
@@ -88,11 +90,37 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         return cc;
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoStep() throws Exception {
+        fillCaches();
+
+        GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select * from _cnts_");
+
+        q.addMapQuery("_cnts_", "select count(*) cnt from \"partitioned\".FactPurchase");
+
+        GridCacheQueriesEx<Integer, FactPurchase> qx =
+            (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries();
+
+        for (List<?> row : qx.execute(q).get())
+            X.println("__ "  + row);
+
+
+
+//        Object cnt = .next().get(0);
+//
+//        assertEquals(10L, cnt);
+    }
+
     /** @throws Exception If failed. */
     public void testOnProjection() throws Exception {
+        fillCaches();
+
         GridCacheProjection<Integer, FactPurchase> prj = ignite.<Integer, FactPurchase>cache("partitioned").projection(
             new IgnitePredicate<GridCacheEntry<Integer, FactPurchase>>() {
-                @Override public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
+                @Override
+                public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
                     return e.getKey() > 12;
                 }
             });
@@ -105,7 +133,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
     /**
      * @throws IgniteCheckedException If failed.
      */
-    private void fillCaches() throws IgniteCheckedException {
+    private void fillCaches() throws IgniteCheckedException, InterruptedException {
+        awaitPartitionMapExchange();
+
         int idGen = 0;
 
         GridCache<Integer, Object> dimCache = ignite.cache("replicated");


[2/8] incubator-ignite git commit: ignite-gg9499 -

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
new file mode 100644
index 0000000..ba5855e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -0,0 +1,46 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import java.io.*;
+
+/**
+ * Error message.
+ */
+public class GridQueryFailResponse implements Serializable {
+    /** */
+    private long qryReqId;
+
+    /** */
+    private Throwable err;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param err Error.
+     */
+    public GridQueryFailResponse(long qryReqId, Throwable err) {
+        this.qryReqId = qryReqId;
+        this.err = err;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Error.
+     */
+    public Throwable error() {
+        return err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
new file mode 100644
index 0000000..7e2e9ad
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -0,0 +1,61 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import org.gridgain.grid.kernal.processors.cache.query.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Query request.
+ */
+public class GridQueryRequest implements Serializable {
+    /** */
+    private long reqId;
+
+    /** */
+    private int pageSize;
+
+    /** */
+    private Collection<GridCacheSqlQuery> qrys;
+
+    /**
+     * @param reqId Request ID.
+     * @param pageSize Page size.
+     * @param qrys Queries.
+     */
+    public GridQueryRequest(long reqId, int pageSize, Collection<GridCacheSqlQuery> qrys) {
+        this.reqId = reqId;
+        this.pageSize = pageSize;
+        this.qrys = qrys;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Queries.
+     */
+    public Collection<GridCacheSqlQuery> queries() {
+        return qrys;
+    }
+}